Skip to main content

ethrex_rpc/clients/eth/
mod.rs

1use std::collections::BTreeMap;
2
3use crate::{
4    eth::client::EthConfigResponse,
5    mempool::MempoolContent,
6    types::{
7        block::RpcBlock,
8        block_identifier::BlockIdentifier,
9        receipt::{RpcLog, RpcReceipt},
10        transaction::RpcTransaction,
11    },
12    utils::{RpcRequest, RpcResponse},
13};
14use bytes::Bytes;
15use errors::{EthClientError, RpcRequestError};
16use ethrex_common::{
17    Address, H256, U256,
18    types::{
19        AuthorizationTupleEntry, BlobsBundle, Block, GenericTransaction, TxKind,
20        block_execution_witness::RpcExecutionWitness,
21    },
22    utils::decode_hex,
23};
24use ethrex_rlp::decode::RLPDecode;
25use reqwest::{Client, Url};
26use serde_json::{Value, json};
27use tracing::{debug, trace, warn};
28
29pub mod errors;
30
31#[derive(Debug, Clone)]
32pub struct EthClient {
33    client: Client,
34    pub urls: Vec<Url>,
35    pub max_number_of_retries: u64,
36    pub backoff_factor: u64,
37    pub min_retry_delay: u64,
38    pub max_retry_delay: u64,
39    pub maximum_allowed_max_fee_per_gas: Option<u64>,
40    pub maximum_allowed_max_fee_per_blob_gas: Option<u64>,
41}
42
43#[derive(Default, Clone, Debug)]
44pub struct Overrides {
45    pub from: Option<Address>,
46    pub to: Option<TxKind>,
47    pub value: Option<U256>,
48    pub nonce: Option<u64>,
49    pub chain_id: Option<u64>,
50    pub gas_limit: Option<u64>,
51    pub max_fee_per_gas: Option<u64>,
52    pub max_priority_fee_per_gas: Option<u64>,
53    pub access_list: Vec<(Address, Vec<H256>)>,
54    pub fee_token: Option<Address>,
55    pub gas_price_per_blob: Option<U256>,
56    pub block: Option<BlockIdentifier>,
57    pub blobs_bundle: Option<BlobsBundle>,
58    pub authorization_list: Option<Vec<AuthorizationTupleEntry>>,
59    pub wrapper_version: Option<u8>,
60}
61
62pub const MAX_NUMBER_OF_RETRIES: u64 = 10;
63pub const BACKOFF_FACTOR: u64 = 2;
64// Give at least 8 blocks before trying to bump gas.
65pub const MIN_RETRY_DELAY: u64 = 96;
66pub const MAX_RETRY_DELAY: u64 = 1800;
67
68// 0x08c379a0 == Error(String)
69pub const ERROR_FUNCTION_SELECTOR: [u8; 4] = [0x08, 0xc3, 0x79, 0xa0];
70
71impl EthClient {
72    pub fn new(url: Url) -> Result<EthClient, EthClientError> {
73        Self::new_with_config(
74            vec![url],
75            MAX_NUMBER_OF_RETRIES,
76            BACKOFF_FACTOR,
77            MIN_RETRY_DELAY,
78            MAX_RETRY_DELAY,
79            None,
80            None,
81        )
82    }
83
84    pub fn new_with_config(
85        urls: Vec<Url>,
86        max_number_of_retries: u64,
87        backoff_factor: u64,
88        min_retry_delay: u64,
89        max_retry_delay: u64,
90        maximum_allowed_max_fee_per_gas: Option<u64>,
91        maximum_allowed_max_fee_per_blob_gas: Option<u64>,
92    ) -> Result<Self, EthClientError> {
93        Ok(Self {
94            client: Client::new(),
95            urls,
96            max_number_of_retries,
97            backoff_factor,
98            min_retry_delay,
99            max_retry_delay,
100            maximum_allowed_max_fee_per_gas,
101            maximum_allowed_max_fee_per_blob_gas,
102        })
103    }
104
105    pub fn new_with_multiple_urls(urls: Vec<Url>) -> Result<EthClient, EthClientError> {
106        Self::new_with_config(
107            urls,
108            MAX_NUMBER_OF_RETRIES,
109            BACKOFF_FACTOR,
110            MIN_RETRY_DELAY,
111            MAX_RETRY_DELAY,
112            None,
113            None,
114        )
115    }
116
117    /// Send a request to the RPC. Tries each URL until one succeeds.
118    pub async fn send_request(&self, request: RpcRequest) -> Result<RpcResponse, EthClientError> {
119        let mut response = Err(EthClientError::FailedAllRPC);
120
121        for url in self.urls.iter() {
122            response = self.send_request_to_url(url, &request).await;
123            // Some RPC servers don't implement all the endpoints or don't implement them completely/correctly
124            // so if the server returns Ok(RpcResponse::Error) we retry with the others
125            match &response {
126                Ok(RpcResponse::Success(_)) => {
127                    debug!(endpoint = %url, "RPC request successful");
128                    return response;
129                }
130                Ok(RpcResponse::Error(err)) => {
131                    debug!(endpoint = %url, error = ?err.error, "RPC server returned an error");
132                }
133                Err(error) => {
134                    warn!(endpoint = %url, %error, "Could not request RPC server");
135                }
136            }
137        }
138
139        response
140    }
141
142    /// Send a request to **all** RPC URLs.
143    ///
144    /// Return the first successful response, or the last error if all fail.
145    async fn send_request_to_all(
146        &self,
147        request: RpcRequest,
148    ) -> Result<RpcResponse, EthClientError> {
149        let mut response = Err(EthClientError::FailedAllRPC);
150
151        for url in self.urls.iter() {
152            let maybe_response = self.send_request_to_url(url, &request).await;
153
154            match &maybe_response {
155                Ok(RpcResponse::Success(_)) => {
156                    debug!(endpoint = %url, "RPC request successful");
157                }
158                Ok(RpcResponse::Error(err)) => {
159                    debug!(endpoint = %url, error = ?err.error, "RPC server returned an error");
160                }
161                Err(error) => {
162                    warn!(endpoint = %url, %error, "Could not request RPC server");
163                }
164            };
165
166            response = response.or(maybe_response);
167        }
168
169        response
170    }
171
172    /// Send a request to a specific URL.
173    async fn send_request_to_url(
174        &self,
175        rpc_url: &Url,
176        request: &RpcRequest,
177    ) -> Result<RpcResponse, EthClientError> {
178        let id = uuid::Uuid::new_v4();
179        trace!(endpoint = %rpc_url, ?request, %id, "Sending RPC request");
180
181        self.client
182            .post(rpc_url.as_str())
183            .header("content-type", "application/json")
184            .body(serde_json::ser::to_string(&request).map_err(|error| {
185                EthClientError::FailedToSerializeRequestBody(format!("{error}: {request:?}"))
186            })?)
187            .send()
188            .await
189            .inspect(|_| trace!(endpoint = %rpc_url, %id, "Request finished successfully"))?
190            .json::<RpcResponse>()
191            .await
192            .inspect(|body| trace!(endpoint = %rpc_url, %id, ?body, "Response deserialized successfully"))
193            .inspect_err(|err| trace!(endpoint = %rpc_url, %id, %err, "Failed to deserialize response"))
194            .map_err(EthClientError::from)
195    }
196
197    /// Helper to send a request and parse the response, handling errors uniformly.
198    /// Extracts the method name from the request for error reporting.
199    pub async fn send_request_parsed<T: serde::de::DeserializeOwned>(
200        &self,
201        request: RpcRequest,
202    ) -> Result<T, EthClientError> {
203        let method = request.method.clone();
204        match self.send_request(request).await? {
205            RpcResponse::Success(result) => serde_json::from_value(result.result)
206                .map_err(|e| RpcRequestError::SerdeJSONError { method, source: e })
207                .map_err(EthClientError::from),
208            RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
209                method,
210                message: error_response.error.message,
211                data: error_response.error.data,
212            }
213            .into()),
214        }
215    }
216
217    /// Helper to send a request to all URLs and parse the response.
218    async fn send_request_to_all_parsed<T: serde::de::DeserializeOwned>(
219        &self,
220        request: RpcRequest,
221    ) -> Result<T, EthClientError> {
222        let method = request.method.clone();
223        match self.send_request_to_all(request).await? {
224            RpcResponse::Success(result) => serde_json::from_value(result.result)
225                .map_err(|e| RpcRequestError::SerdeJSONError { method, source: e })
226                .map_err(EthClientError::from),
227            RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
228                method,
229                message: error_response.error.message,
230                data: error_response.error.data,
231            }
232            .into()),
233        }
234    }
235
236    pub async fn send_raw_transaction(&self, data: &[u8]) -> Result<H256, EthClientError> {
237        let params = Some(vec![json!("0x".to_string() + &hex::encode(data))]);
238        let request = RpcRequest::new("eth_sendRawTransaction", params);
239        self.send_request_to_all_parsed(request).await
240    }
241
242    pub async fn estimate_gas(
243        &self,
244        transaction: GenericTransaction,
245    ) -> Result<u64, EthClientError> {
246        let to = match transaction.to {
247            TxKind::Call(addr) => Some(format!("{addr:#x}")),
248            TxKind::Create => None,
249        };
250
251        let mut data = json!({
252            "to": to,
253            "input": format!("0x{:#x}", transaction.input),
254            "from": format!("{:#x}", transaction.from),
255            "value": format!("{:#x}", transaction.value),
256
257        });
258
259        if !transaction.blob_versioned_hashes.is_empty() {
260            let blob_versioned_hashes_str: Vec<_> = transaction
261                .blob_versioned_hashes
262                .into_iter()
263                .map(|hash| format!("{hash:#x}"))
264                .collect();
265
266            data.as_object_mut()
267                .ok_or_else(|| {
268                    EthClientError::Custom("Failed to mutate data in estimate_gas".to_owned())
269                })?
270                .insert(
271                    "blobVersionedHashes".to_owned(),
272                    json!(blob_versioned_hashes_str),
273                );
274        }
275
276        if !transaction.blobs.is_empty() {
277            let blobs_str: Vec<_> = transaction
278                .blobs
279                .into_iter()
280                .map(|blob| format!("0x{}", hex::encode(blob)))
281                .collect();
282
283            data.as_object_mut()
284                .ok_or_else(|| {
285                    EthClientError::Custom("Failed to mutate data in estimate_gas".to_owned())
286                })?
287                .insert("blobs".to_owned(), json!(blobs_str));
288        }
289
290        // Add the nonce just if present, otherwise the RPC will use the latest nonce
291        if let Some(nonce) = transaction.nonce
292            && let Value::Object(ref mut map) = data
293        {
294            map.insert("nonce".to_owned(), json!(format!("{nonce:#x}")));
295        }
296
297        let request = RpcRequest::new("eth_estimateGas", Some(vec![data, json!("latest")]));
298
299        match self.send_request(request).await? {
300            RpcResponse::Success(result) => {
301                let res = serde_json::from_value::<String>(result.result).map_err(|e| {
302                    RpcRequestError::SerdeJSONError {
303                        method: "eth_estimateGas".to_string(),
304                        source: e,
305                    }
306                })?;
307                let res = res.get(2..).ok_or(RpcRequestError::Custom(
308                    "Failed to slice index response in estimate_gas".to_owned(),
309                ))?;
310                u64::from_str_radix(res, 16)
311            }
312            .map_err(|e| RpcRequestError::ParseIntError {
313                method: "eth_estimateGas".to_string(),
314                source: e,
315            })
316            .map_err(EthClientError::from),
317            RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
318                method: "eth_estimateGas".to_string(),
319                message: error_response.error.message,
320                data: error_response.error.data,
321            }
322            .into()),
323        }
324    }
325
326    pub async fn call(
327        &self,
328        to: Address,
329        calldata: Bytes,
330        overrides: Overrides,
331    ) -> Result<String, EthClientError> {
332        let tx = GenericTransaction {
333            to: TxKind::Call(to),
334            input: calldata,
335            value: overrides.value.unwrap_or_default(),
336            from: overrides.from.unwrap_or_default(),
337            gas: overrides.gas_limit,
338            gas_price: U256::from(overrides.max_fee_per_gas.unwrap_or_default()),
339            ..Default::default()
340        };
341        let mut tx_json = json!({
342            "to": match tx.to {
343                TxKind::Call(addr) => format!("{addr:#x}"),
344                TxKind::Create => format!("{:#x}", Address::zero()),
345            },
346            "input": format!("0x{:#x}", tx.input),
347            "value": format!("{:#x}", tx.value),
348            "from": format!("{:#x}", tx.from),
349        });
350        if let Some(nonce) = overrides.nonce {
351            tx_json["nonce"] = json!(format!("{nonce:#x}"));
352        }
353        let params = Some(vec![
354            tx_json,
355            overrides
356                .block
357                .map(Into::into)
358                .unwrap_or(serde_json::Value::String("latest".to_string())),
359        ]);
360
361        let request = RpcRequest::new("eth_call", params);
362        self.send_request_parsed(request).await
363    }
364
365    pub async fn get_max_priority_fee(&self) -> Result<U256, EthClientError> {
366        let request = RpcRequest::new("eth_maxPriorityFeePerGas", None);
367        self.send_request_parsed(request).await
368    }
369
370    pub async fn get_gas_price(&self) -> Result<U256, EthClientError> {
371        let request = RpcRequest::new("eth_gasPrice", None);
372        self.send_request_parsed(request).await
373    }
374
375    pub async fn get_gas_price_with_extra(
376        &self,
377        bump_percent: u64,
378    ) -> Result<U256, EthClientError> {
379        let gas_price = self.get_gas_price().await?;
380
381        Ok((gas_price * (100 + bump_percent)) / 100)
382    }
383
384    pub async fn get_nonce(
385        &self,
386        address: Address,
387        block: BlockIdentifier,
388    ) -> Result<u64, EthClientError> {
389        let params = Some(vec![json!(format!("{address:#x}")), block.into()]);
390        let request = RpcRequest::new("eth_getTransactionCount", params);
391
392        match self.send_request(request).await? {
393            RpcResponse::Success(result) => u64::from_str_radix(
394                serde_json::from_value::<String>(result.result)
395                    .map_err(|e| RpcRequestError::SerdeJSONError {
396                        method: "eth_getTransactionCount".to_string(),
397                        source: e,
398                    })?
399                    .get(2..)
400                    .ok_or(EthClientError::Custom(
401                        "Failed to deserialize get_nonce request".to_owned(),
402                    ))?,
403                16,
404            )
405            .map_err(|e| RpcRequestError::ParseIntError {
406                method: "eth_getTransactionCount".to_string(),
407                source: e,
408            })
409            .map_err(EthClientError::from),
410            RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
411                method: "eth_getTransactionCount".to_string(),
412                message: error_response.error.message,
413                data: error_response.error.data,
414            }
415            .into()),
416        }
417    }
418
419    pub async fn get_block_number(&self) -> Result<u64, EthClientError> {
420        let request = RpcRequest::new("eth_blockNumber", None);
421        let block_number: U256 = self.send_request_parsed(request).await?;
422        u64::try_from(block_number)
423            .map_err(|_| EthClientError::Custom("block number overflows u64".to_owned()))
424    }
425
426    pub async fn get_block_by_hash(&self, block_hash: H256) -> Result<RpcBlock, EthClientError> {
427        let params = Some(vec![json!(block_hash), json!(true)]);
428        let request = RpcRequest::new("eth_getBlockByHash", params);
429        self.send_request_parsed(request).await
430    }
431
432    pub async fn peer_count(&self) -> Result<U256, EthClientError> {
433        let request = RpcRequest::new("net_peerCount", Some(vec![]));
434        self.send_request_parsed(request).await
435    }
436
437    /// Fetches a block from the Ethereum blockchain by its number or the latest/earliest/pending block.
438    /// If no `block_number` is provided, get the latest.
439    pub async fn get_block_by_number(
440        &self,
441        block: BlockIdentifier,
442        hydrated: bool,
443    ) -> Result<RpcBlock, EthClientError> {
444        let params = Some(vec![block.into(), json!(hydrated)]);
445        let request = RpcRequest::new("eth_getBlockByNumber", params);
446        self.send_request_parsed(request).await
447    }
448
449    pub async fn get_raw_block(&self, block: BlockIdentifier) -> Result<Block, EthClientError> {
450        let request = RpcRequest::new("debug_getRawBlock", Some(vec![block.into()]));
451
452        let encoded_block: Result<String, _> = match self.send_request(request).await? {
453            RpcResponse::Success(result) => {
454                serde_json::from_value(result.result).map_err(|e| RpcRequestError::SerdeJSONError {
455                    method: "debug_getRawBlock".to_string(),
456                    source: e,
457                })
458            }
459            RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
460                method: "debug_getRawBlock".to_string(),
461                message: error_response.error.message,
462                data: error_response.error.data,
463            }),
464        };
465
466        let encoded_block = decode_hex(&encoded_block?)
467            .map_err(|e| EthClientError::Custom(format!("Failed to decode hex: {e}")))?;
468
469        let block = Block::decode_unfinished(&encoded_block).map_err(|e| {
470            RpcRequestError::RLPDecodeError {
471                method: "debug_getRawBlock".to_string(),
472                message: e.to_string(),
473            }
474        })?;
475        Ok(block.0)
476    }
477
478    pub async fn get_logs(
479        &self,
480        from_block: U256,
481        to_block: U256,
482        address: Address,
483        topics: Vec<H256>,
484    ) -> Result<Vec<RpcLog>, EthClientError> {
485        let params = Some(vec![serde_json::json!(
486            {
487                "fromBlock": format!("{:#x}", from_block),
488                "toBlock": format!("{:#x}", to_block),
489                "address": format!("{:#x}", address),
490                "topics": topics.iter().map(|topic| format!("{topic:#x}")).collect::<Vec<_>>()
491            }
492        )]);
493        let request = RpcRequest::new("eth_getLogs", params);
494        self.send_request_parsed(request).await
495    }
496
497    pub async fn get_transaction_receipt(
498        &self,
499        tx_hash: H256,
500    ) -> Result<Option<RpcReceipt>, EthClientError> {
501        let params = Some(vec![json!(format!("{:#x}", tx_hash))]);
502        let request = RpcRequest::new("eth_getTransactionReceipt", params);
503        self.send_request_parsed(request).await
504    }
505
506    pub async fn get_balance(
507        &self,
508        address: Address,
509        block: BlockIdentifier,
510    ) -> Result<U256, EthClientError> {
511        let params = Some(vec![json!(format!("{:#x}", address)), block.into()]);
512        let request = RpcRequest::new("eth_getBalance", params);
513        self.send_request_parsed(request).await
514    }
515
516    pub async fn get_storage_at(
517        &self,
518        address: Address,
519        slot: U256,
520        block: BlockIdentifier,
521    ) -> Result<U256, EthClientError> {
522        let params = Some(vec![
523            json!(format!("{:#x}", address)),
524            json!(format!("{:#x}", slot)),
525            block.into(),
526        ]);
527        let request = RpcRequest::new("eth_getStorageAt", params);
528        self.send_request_parsed(request).await
529    }
530
531    pub async fn get_chain_id(&self) -> Result<U256, EthClientError> {
532        let request = RpcRequest::new("eth_chainId", None);
533        self.send_request_parsed(request).await
534    }
535
536    pub async fn get_eth_config(&self) -> Result<EthConfigResponse, EthClientError> {
537        let request = RpcRequest::new("eth_config", None);
538        self.send_request_parsed(request).await
539    }
540
541    pub async fn get_code(
542        &self,
543        address: Address,
544        block: BlockIdentifier,
545    ) -> Result<Bytes, EthClientError> {
546        let params = Some(vec![json!(format!("{:#x}", address)), block.into()]);
547        let request = RpcRequest::new("eth_getCode", params);
548
549        match self.send_request(request).await? {
550            RpcResponse::Success(result) => hex::decode(
551                &serde_json::from_value::<String>(result.result)
552                    .map(|hex_str| {
553                        hex_str
554                            .strip_prefix("0x")
555                            .map(ToString::to_string)
556                            .unwrap_or(hex_str)
557                    })
558                    .map_err(|e| RpcRequestError::SerdeJSONError {
559                        method: "eth_getCode".to_string(),
560                        source: e,
561                    })
562                    .map_err(EthClientError::from)?,
563            )
564            .map(Into::into)
565            .map_err(|e| RpcRequestError::HexError {
566                method: "eth_getCode".to_string(),
567                source: e,
568            })
569            .map_err(EthClientError::from),
570            RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
571                method: "eth_getCode".to_string(),
572                message: error_response.error.message,
573                data: error_response.error.data,
574            }
575            .into()),
576        }
577    }
578
579    pub async fn get_transaction_by_hash(
580        &self,
581        tx_hash: H256,
582    ) -> Result<Option<RpcTransaction>, EthClientError> {
583        let params = Some(vec![json!(format!("{tx_hash:#x}"))]);
584        let request = RpcRequest::new("eth_getTransactionByHash", params);
585        self.send_request_parsed(request).await
586    }
587
588    /// Fethches the execution witnes for a given block or range of blocks.
589    /// WARNNING: This method is only compatible with ethrex and not with other debug_executionWitness implementations.
590    pub async fn get_witness(
591        &self,
592        from: BlockIdentifier,
593        to: Option<BlockIdentifier>,
594    ) -> Result<RpcExecutionWitness, EthClientError> {
595        let params = if let Some(to_block) = to {
596            Some(vec![from.into(), to_block.into()])
597        } else {
598            Some(vec![from.into()])
599        };
600
601        let request = RpcRequest::new("debug_executionWitness", params);
602        self.send_request_parsed(request).await
603    }
604
605    pub async fn tx_pool_content(&self) -> Result<MempoolContent, EthClientError> {
606        let request = RpcRequest::new("txpool_content", None);
607        self.send_request_parsed(request).await
608    }
609
610    pub async fn get_blob_base_fee(&self, block: BlockIdentifier) -> Result<u64, EthClientError> {
611        let params = Some(vec![block.into()]);
612        let request = RpcRequest::new("eth_blobBaseFee", params);
613
614        match self.send_request(request).await? {
615            RpcResponse::Success(result) => Ok(u64::from_str_radix(
616                serde_json::from_value::<String>(result.result)
617                    .map_err(|e| RpcRequestError::SerdeJSONError {
618                        method: "eth_blobBaseFee".to_string(),
619                        source: e,
620                    })?
621                    .trim_start_matches("0x"),
622                16,
623            )
624            .map_err(|e| RpcRequestError::ParseIntError {
625                method: "eth_blobBaseFee".to_string(),
626                source: e,
627            })?),
628            RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
629                method: "eth_blobBaseFee".to_string(),
630                message: error_response.error.message,
631                data: error_response.error.data,
632            }
633            .into()),
634        }
635    }
636
637    /// Smoke test the all the urls by calling eth_blockNumber
638    pub async fn test_urls(&self) -> BTreeMap<String, serde_json::Value> {
639        let mut map = BTreeMap::new();
640        for url in self.urls.iter() {
641            let response = match self
642                .send_request_to_url(url, &RpcRequest::new("eth_blockNumber", None))
643                .await
644            {
645                Ok(RpcResponse::Success(ok)) => serde_json::to_value(ok).unwrap_or_else(|e| {
646                    serde_json::Value::String(format!("Failed to serialize success response: {e}"))
647                }),
648                Ok(RpcResponse::Error(e)) => serde_json::to_value(e).unwrap_or_else(|e| {
649                    serde_json::Value::String(format!("Failed to serialize error response: {e}"))
650                }),
651                Err(e) => serde_json::Value::String(format!("Request error: {e}")),
652            };
653            map.insert(url.to_string(), response);
654        }
655        map
656    }
657}