erc20_rpc_pool/rpc_pool/
eth_generic_call.rs

1use crate::rpc_pool::web3_error_list::check_if_proper_rpc_error;
2use crate::rpc_pool::VerifyEndpointResult;
3use crate::Web3RpcPool;
4use erc20_payment_lib_common::{
5    DriverEvent, DriverEventContent, Web3RpcPoolContent, Web3RpcPoolInfo,
6};
7use serde::de::DeserializeOwned;
8use std::sync::Arc;
9use std::time::Duration;
10use web3::{api::Eth, helpers::CallFuture};
11
12pub trait EthMethod<T: web3::Transport> {
13    const METHOD: &'static str;
14    type Args: Clone;
15    type Return: DeserializeOwned;
16
17    fn do_call(eth: Eth<T>, args: Self::Args) -> CallFuture<Self::Return, T::Out>;
18}
19
20impl Web3RpcPool {
21    pub async fn eth_generic_call<EthMethodCall: EthMethod<web3::transports::Http>>(
22        self: Arc<Self>,
23        args: EthMethodCall::Args,
24    ) -> Result<EthMethodCall::Return, web3::Error> {
25        let mut loop_no = 0;
26        const LOOP_COUNT: usize = 4;
27        loop {
28            let resp = self.clone().choose_best_endpoints().await;
29            if resp.allowed_endpoints.is_empty() && !resp.is_resolving {
30                log::warn!("No valid endpoints found for chain id {}, wait until next check. Call yagna payment driver rpc --verify for details", self.chain_id);
31                return Err(web3::Error::Unreachable);
32            }
33            let idx_vec = resp.allowed_endpoints;
34            if let Some(idx_chosen) = idx_vec.first() {
35                self.mark_rpc_chosen(*idx_chosen);
36            }
37
38            if idx_vec.is_empty() {
39                if loop_no >= LOOP_COUNT {
40                    if let Some(event_sender) =
41                        self.event_sender.clone().and_then(|es| es.upgrade())
42                    {
43                        let _ = event_sender
44                            .send(DriverEvent {
45                                create_date: chrono::Utc::now(),
46                                content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
47                                    chain_id: self.chain_id,
48                                    content: Web3RpcPoolContent::AllEndpointsFailed,
49                                }),
50                            })
51                            .await;
52                    }
53                    log::warn!(
54                        "Seems like all RPC endpoints failed - chain id: {}",
55                        self.chain_id
56                    );
57                    return Err(web3::Error::Unreachable);
58                }
59                // sleep for 800, 1200, 2000, 2800 ms - total max sleep time is 6800 ms
60                let sleep_times: [u64; LOOP_COUNT] = [800, 1200, 2000, 2800];
61                tokio::time::sleep(Duration::from_millis(sleep_times[loop_no])).await;
62                loop_no += 1;
63                continue;
64            }
65
66            for idx in idx_vec {
67                let res = match self.get_web3(idx) {
68                    Some(web3) => tokio::time::timeout(
69                        self.get_max_timeout(idx),
70                        EthMethodCall::do_call(web3.eth(), args.clone()),
71                    ),
72                    None => {
73                        //this case is possible if endpoint is removed from pool, just skip it and try next one
74                        log::warn!("No web3 instance found on specified index");
75                        continue;
76                    }
77                };
78
79                let err = match res.await {
80                    Ok(Ok(balance)) => {
81                        self.mark_rpc_success(idx, EthMethodCall::METHOD.to_string());
82                        if let Some(event_sender) =
83                            self.event_sender.clone().and_then(|es| es.upgrade())
84                        {
85                            let _ = event_sender
86                                .send(DriverEvent {
87                                    create_date: chrono::Utc::now(),
88                                    content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
89                                        chain_id: self.chain_id,
90                                        content: Web3RpcPoolContent::Success,
91                                    }),
92                                })
93                                .await;
94                        }
95                        return Ok(balance);
96                    }
97                    Ok(Err(e)) => match e {
98                        web3::Error::Rpc(e) => {
99                            let proper = check_if_proper_rpc_error(&e.to_string());
100                            if proper {
101                                self.mark_rpc_success(idx, EthMethodCall::METHOD.to_string());
102                                if let Some(event_sender) =
103                                    self.event_sender.clone().and_then(|es| es.upgrade())
104                                {
105                                    let _ = event_sender
106                                        .send(DriverEvent {
107                                            create_date: chrono::Utc::now(),
108                                            content: DriverEventContent::Web3RpcMessage(
109                                                Web3RpcPoolInfo {
110                                                    chain_id: self.chain_id,
111                                                    content: Web3RpcPoolContent::Success,
112                                                },
113                                            ),
114                                        })
115                                        .await;
116                                }
117                                return Err(web3::Error::Rpc(e));
118                            } else {
119                                log::warn!(
120                                    "Unknown RPC error when calling {} from endpoint {}: {}",
121                                    EthMethodCall::METHOD,
122                                    self.get_name(idx),
123                                    e
124                                );
125                                self.mark_rpc_error(
126                                    idx,
127                                    EthMethodCall::METHOD.to_string(),
128                                    VerifyEndpointResult::RpcWeb3Error(e.to_string()),
129                                );
130                                web3::Error::Rpc(e)
131                            }
132                        }
133                        _ => {
134                            log::warn!(
135                                "Error doing call {} from endpoint {}: {}",
136                                EthMethodCall::METHOD,
137                                self.get_name(idx),
138                                e
139                            );
140                            self.mark_rpc_error(
141                                idx,
142                                EthMethodCall::METHOD.to_string(),
143                                VerifyEndpointResult::OtherNetworkError(e.to_string()),
144                            );
145                            e
146                        }
147                    },
148                    Err(e) => {
149                        log::warn!(
150                            "Timeout when getting data from endpoint {}: {}",
151                            self.get_name(idx),
152                            e
153                        );
154                        self.mark_rpc_error(
155                            idx,
156                            EthMethodCall::METHOD.to_string(),
157                            VerifyEndpointResult::Unreachable,
158                        );
159                        web3::Error::Unreachable
160                    }
161                };
162                if loop_no >= LOOP_COUNT {
163                    if let Some(event_sender) =
164                        self.event_sender.clone().and_then(|es| es.upgrade())
165                    {
166                        let _ = event_sender
167                            .send(DriverEvent {
168                                create_date: chrono::Utc::now(),
169                                content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
170                                    chain_id: self.chain_id,
171                                    content: Web3RpcPoolContent::Error(format!(
172                                        "Web3 rpc call failed {}",
173                                        err
174                                    )),
175                                }),
176                            })
177                            .await;
178                    }
179                    return Err(err);
180                }
181                // sleep for 800, 1200, 2000, 2800 ms - total max sleep time is 6800 ms
182                let sleep_times: [u64; LOOP_COUNT] = [800, 1200, 2000, 2800];
183                tokio::time::sleep(Duration::from_millis(sleep_times[loop_no])).await;
184                loop_no += 1;
185            }
186        }
187    }
188}