1use std::collections::BTreeMap;
2
3use crate::{
4 eth::client::EthConfigResponse,
5 mempool::MempoolContent,
6 types::{
7 block::RpcBlock,
8 block_identifier::BlockIdentifier,
9 receipt::{RpcLog, RpcReceipt},
10 transaction::RpcTransaction,
11 },
12 utils::{RpcRequest, RpcResponse},
13};
14use bytes::Bytes;
15use errors::{EthClientError, RpcRequestError};
16use ethrex_common::{
17 Address, H256, U256,
18 types::{
19 AuthorizationTupleEntry, BlobsBundle, Block, GenericTransaction, TxKind,
20 block_execution_witness::RpcExecutionWitness,
21 },
22 utils::decode_hex,
23};
24use ethrex_rlp::decode::RLPDecode;
25use reqwest::{Client, Url};
26use serde_json::{Value, json};
27use tracing::{debug, trace, warn};
28
29pub mod errors;
30
31#[derive(Debug, Clone)]
32pub struct EthClient {
33 client: Client,
34 pub urls: Vec<Url>,
35 pub max_number_of_retries: u64,
36 pub backoff_factor: u64,
37 pub min_retry_delay: u64,
38 pub max_retry_delay: u64,
39 pub maximum_allowed_max_fee_per_gas: Option<u64>,
40 pub maximum_allowed_max_fee_per_blob_gas: Option<u64>,
41}
42
43#[derive(Default, Clone, Debug)]
44pub struct Overrides {
45 pub from: Option<Address>,
46 pub to: Option<TxKind>,
47 pub value: Option<U256>,
48 pub nonce: Option<u64>,
49 pub chain_id: Option<u64>,
50 pub gas_limit: Option<u64>,
51 pub max_fee_per_gas: Option<u64>,
52 pub max_priority_fee_per_gas: Option<u64>,
53 pub access_list: Vec<(Address, Vec<H256>)>,
54 pub fee_token: Option<Address>,
55 pub gas_price_per_blob: Option<U256>,
56 pub block: Option<BlockIdentifier>,
57 pub blobs_bundle: Option<BlobsBundle>,
58 pub authorization_list: Option<Vec<AuthorizationTupleEntry>>,
59 pub wrapper_version: Option<u8>,
60}
61
62pub const MAX_NUMBER_OF_RETRIES: u64 = 10;
63pub const BACKOFF_FACTOR: u64 = 2;
64pub const MIN_RETRY_DELAY: u64 = 96;
66pub const MAX_RETRY_DELAY: u64 = 1800;
67
68pub const ERROR_FUNCTION_SELECTOR: [u8; 4] = [0x08, 0xc3, 0x79, 0xa0];
70
71impl EthClient {
72 pub fn new(url: Url) -> Result<EthClient, EthClientError> {
73 Self::new_with_config(
74 vec![url],
75 MAX_NUMBER_OF_RETRIES,
76 BACKOFF_FACTOR,
77 MIN_RETRY_DELAY,
78 MAX_RETRY_DELAY,
79 None,
80 None,
81 )
82 }
83
84 pub fn new_with_config(
85 urls: Vec<Url>,
86 max_number_of_retries: u64,
87 backoff_factor: u64,
88 min_retry_delay: u64,
89 max_retry_delay: u64,
90 maximum_allowed_max_fee_per_gas: Option<u64>,
91 maximum_allowed_max_fee_per_blob_gas: Option<u64>,
92 ) -> Result<Self, EthClientError> {
93 Ok(Self {
94 client: Client::new(),
95 urls,
96 max_number_of_retries,
97 backoff_factor,
98 min_retry_delay,
99 max_retry_delay,
100 maximum_allowed_max_fee_per_gas,
101 maximum_allowed_max_fee_per_blob_gas,
102 })
103 }
104
105 pub fn new_with_multiple_urls(urls: Vec<Url>) -> Result<EthClient, EthClientError> {
106 Self::new_with_config(
107 urls,
108 MAX_NUMBER_OF_RETRIES,
109 BACKOFF_FACTOR,
110 MIN_RETRY_DELAY,
111 MAX_RETRY_DELAY,
112 None,
113 None,
114 )
115 }
116
117 pub async fn send_request(&self, request: RpcRequest) -> Result<RpcResponse, EthClientError> {
119 let mut response = Err(EthClientError::FailedAllRPC);
120
121 for url in self.urls.iter() {
122 response = self.send_request_to_url(url, &request).await;
123 match &response {
126 Ok(RpcResponse::Success(_)) => {
127 debug!(endpoint = %url, "RPC request successful");
128 return response;
129 }
130 Ok(RpcResponse::Error(err)) => {
131 debug!(endpoint = %url, error = ?err.error, "RPC server returned an error");
132 }
133 Err(error) => {
134 warn!(endpoint = %url, %error, "Could not request RPC server");
135 }
136 }
137 }
138
139 response
140 }
141
142 async fn send_request_to_all(
146 &self,
147 request: RpcRequest,
148 ) -> Result<RpcResponse, EthClientError> {
149 let mut response = Err(EthClientError::FailedAllRPC);
150
151 for url in self.urls.iter() {
152 let maybe_response = self.send_request_to_url(url, &request).await;
153
154 match &maybe_response {
155 Ok(RpcResponse::Success(_)) => {
156 debug!(endpoint = %url, "RPC request successful");
157 }
158 Ok(RpcResponse::Error(err)) => {
159 debug!(endpoint = %url, error = ?err.error, "RPC server returned an error");
160 }
161 Err(error) => {
162 warn!(endpoint = %url, %error, "Could not request RPC server");
163 }
164 };
165
166 response = response.or(maybe_response);
167 }
168
169 response
170 }
171
172 async fn send_request_to_url(
174 &self,
175 rpc_url: &Url,
176 request: &RpcRequest,
177 ) -> Result<RpcResponse, EthClientError> {
178 let id = uuid::Uuid::new_v4();
179 trace!(endpoint = %rpc_url, ?request, %id, "Sending RPC request");
180
181 self.client
182 .post(rpc_url.as_str())
183 .header("content-type", "application/json")
184 .body(serde_json::ser::to_string(&request).map_err(|error| {
185 EthClientError::FailedToSerializeRequestBody(format!("{error}: {request:?}"))
186 })?)
187 .send()
188 .await
189 .inspect(|_| trace!(endpoint = %rpc_url, %id, "Request finished successfully"))?
190 .json::<RpcResponse>()
191 .await
192 .inspect(|body| trace!(endpoint = %rpc_url, %id, ?body, "Response deserialized successfully"))
193 .inspect_err(|err| trace!(endpoint = %rpc_url, %id, %err, "Failed to deserialize response"))
194 .map_err(EthClientError::from)
195 }
196
197 pub async fn send_request_parsed<T: serde::de::DeserializeOwned>(
200 &self,
201 request: RpcRequest,
202 ) -> Result<T, EthClientError> {
203 let method = request.method.clone();
204 match self.send_request(request).await? {
205 RpcResponse::Success(result) => serde_json::from_value(result.result)
206 .map_err(|e| RpcRequestError::SerdeJSONError { method, source: e })
207 .map_err(EthClientError::from),
208 RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
209 method,
210 message: error_response.error.message,
211 data: error_response.error.data,
212 }
213 .into()),
214 }
215 }
216
217 async fn send_request_to_all_parsed<T: serde::de::DeserializeOwned>(
219 &self,
220 request: RpcRequest,
221 ) -> Result<T, EthClientError> {
222 let method = request.method.clone();
223 match self.send_request_to_all(request).await? {
224 RpcResponse::Success(result) => serde_json::from_value(result.result)
225 .map_err(|e| RpcRequestError::SerdeJSONError { method, source: e })
226 .map_err(EthClientError::from),
227 RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
228 method,
229 message: error_response.error.message,
230 data: error_response.error.data,
231 }
232 .into()),
233 }
234 }
235
236 pub async fn send_raw_transaction(&self, data: &[u8]) -> Result<H256, EthClientError> {
237 let params = Some(vec![json!("0x".to_string() + &hex::encode(data))]);
238 let request = RpcRequest::new("eth_sendRawTransaction", params);
239 self.send_request_to_all_parsed(request).await
240 }
241
242 pub async fn estimate_gas(
243 &self,
244 transaction: GenericTransaction,
245 ) -> Result<u64, EthClientError> {
246 let to = match transaction.to {
247 TxKind::Call(addr) => Some(format!("{addr:#x}")),
248 TxKind::Create => None,
249 };
250
251 let mut data = json!({
252 "to": to,
253 "input": format!("0x{:#x}", transaction.input),
254 "from": format!("{:#x}", transaction.from),
255 "value": format!("{:#x}", transaction.value),
256
257 });
258
259 if !transaction.blob_versioned_hashes.is_empty() {
260 let blob_versioned_hashes_str: Vec<_> = transaction
261 .blob_versioned_hashes
262 .into_iter()
263 .map(|hash| format!("{hash:#x}"))
264 .collect();
265
266 data.as_object_mut()
267 .ok_or_else(|| {
268 EthClientError::Custom("Failed to mutate data in estimate_gas".to_owned())
269 })?
270 .insert(
271 "blobVersionedHashes".to_owned(),
272 json!(blob_versioned_hashes_str),
273 );
274 }
275
276 if !transaction.blobs.is_empty() {
277 let blobs_str: Vec<_> = transaction
278 .blobs
279 .into_iter()
280 .map(|blob| format!("0x{}", hex::encode(blob)))
281 .collect();
282
283 data.as_object_mut()
284 .ok_or_else(|| {
285 EthClientError::Custom("Failed to mutate data in estimate_gas".to_owned())
286 })?
287 .insert("blobs".to_owned(), json!(blobs_str));
288 }
289
290 if let Some(nonce) = transaction.nonce
292 && let Value::Object(ref mut map) = data
293 {
294 map.insert("nonce".to_owned(), json!(format!("{nonce:#x}")));
295 }
296
297 let request = RpcRequest::new("eth_estimateGas", Some(vec![data, json!("latest")]));
298
299 match self.send_request(request).await? {
300 RpcResponse::Success(result) => {
301 let res = serde_json::from_value::<String>(result.result).map_err(|e| {
302 RpcRequestError::SerdeJSONError {
303 method: "eth_estimateGas".to_string(),
304 source: e,
305 }
306 })?;
307 let res = res.get(2..).ok_or(RpcRequestError::Custom(
308 "Failed to slice index response in estimate_gas".to_owned(),
309 ))?;
310 u64::from_str_radix(res, 16)
311 }
312 .map_err(|e| RpcRequestError::ParseIntError {
313 method: "eth_estimateGas".to_string(),
314 source: e,
315 })
316 .map_err(EthClientError::from),
317 RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
318 method: "eth_estimateGas".to_string(),
319 message: error_response.error.message,
320 data: error_response.error.data,
321 }
322 .into()),
323 }
324 }
325
326 pub async fn call(
327 &self,
328 to: Address,
329 calldata: Bytes,
330 overrides: Overrides,
331 ) -> Result<String, EthClientError> {
332 let tx = GenericTransaction {
333 to: TxKind::Call(to),
334 input: calldata,
335 value: overrides.value.unwrap_or_default(),
336 from: overrides.from.unwrap_or_default(),
337 gas: overrides.gas_limit,
338 gas_price: U256::from(overrides.max_fee_per_gas.unwrap_or_default()),
339 ..Default::default()
340 };
341 let mut tx_json = json!({
342 "to": match tx.to {
343 TxKind::Call(addr) => format!("{addr:#x}"),
344 TxKind::Create => format!("{:#x}", Address::zero()),
345 },
346 "input": format!("0x{:#x}", tx.input),
347 "value": format!("{:#x}", tx.value),
348 "from": format!("{:#x}", tx.from),
349 });
350 if let Some(nonce) = overrides.nonce {
351 tx_json["nonce"] = json!(format!("{nonce:#x}"));
352 }
353 let params = Some(vec![
354 tx_json,
355 overrides
356 .block
357 .map(Into::into)
358 .unwrap_or(serde_json::Value::String("latest".to_string())),
359 ]);
360
361 let request = RpcRequest::new("eth_call", params);
362 self.send_request_parsed(request).await
363 }
364
365 pub async fn get_max_priority_fee(&self) -> Result<U256, EthClientError> {
366 let request = RpcRequest::new("eth_maxPriorityFeePerGas", None);
367 self.send_request_parsed(request).await
368 }
369
370 pub async fn get_gas_price(&self) -> Result<U256, EthClientError> {
371 let request = RpcRequest::new("eth_gasPrice", None);
372 self.send_request_parsed(request).await
373 }
374
375 pub async fn get_gas_price_with_extra(
376 &self,
377 bump_percent: u64,
378 ) -> Result<U256, EthClientError> {
379 let gas_price = self.get_gas_price().await?;
380
381 Ok((gas_price * (100 + bump_percent)) / 100)
382 }
383
384 pub async fn get_nonce(
385 &self,
386 address: Address,
387 block: BlockIdentifier,
388 ) -> Result<u64, EthClientError> {
389 let params = Some(vec![json!(format!("{address:#x}")), block.into()]);
390 let request = RpcRequest::new("eth_getTransactionCount", params);
391
392 match self.send_request(request).await? {
393 RpcResponse::Success(result) => u64::from_str_radix(
394 serde_json::from_value::<String>(result.result)
395 .map_err(|e| RpcRequestError::SerdeJSONError {
396 method: "eth_getTransactionCount".to_string(),
397 source: e,
398 })?
399 .get(2..)
400 .ok_or(EthClientError::Custom(
401 "Failed to deserialize get_nonce request".to_owned(),
402 ))?,
403 16,
404 )
405 .map_err(|e| RpcRequestError::ParseIntError {
406 method: "eth_getTransactionCount".to_string(),
407 source: e,
408 })
409 .map_err(EthClientError::from),
410 RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
411 method: "eth_getTransactionCount".to_string(),
412 message: error_response.error.message,
413 data: error_response.error.data,
414 }
415 .into()),
416 }
417 }
418
419 pub async fn get_block_number(&self) -> Result<u64, EthClientError> {
420 let request = RpcRequest::new("eth_blockNumber", None);
421 let block_number: U256 = self.send_request_parsed(request).await?;
422 u64::try_from(block_number)
423 .map_err(|_| EthClientError::Custom("block number overflows u64".to_owned()))
424 }
425
426 pub async fn get_block_by_hash(&self, block_hash: H256) -> Result<RpcBlock, EthClientError> {
427 let params = Some(vec![json!(block_hash), json!(true)]);
428 let request = RpcRequest::new("eth_getBlockByHash", params);
429 self.send_request_parsed(request).await
430 }
431
432 pub async fn peer_count(&self) -> Result<U256, EthClientError> {
433 let request = RpcRequest::new("net_peerCount", Some(vec![]));
434 self.send_request_parsed(request).await
435 }
436
437 pub async fn get_block_by_number(
440 &self,
441 block: BlockIdentifier,
442 hydrated: bool,
443 ) -> Result<RpcBlock, EthClientError> {
444 let params = Some(vec![block.into(), json!(hydrated)]);
445 let request = RpcRequest::new("eth_getBlockByNumber", params);
446 self.send_request_parsed(request).await
447 }
448
449 pub async fn get_raw_block(&self, block: BlockIdentifier) -> Result<Block, EthClientError> {
450 let request = RpcRequest::new("debug_getRawBlock", Some(vec![block.into()]));
451
452 let encoded_block: Result<String, _> = match self.send_request(request).await? {
453 RpcResponse::Success(result) => {
454 serde_json::from_value(result.result).map_err(|e| RpcRequestError::SerdeJSONError {
455 method: "debug_getRawBlock".to_string(),
456 source: e,
457 })
458 }
459 RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
460 method: "debug_getRawBlock".to_string(),
461 message: error_response.error.message,
462 data: error_response.error.data,
463 }),
464 };
465
466 let encoded_block = decode_hex(&encoded_block?)
467 .map_err(|e| EthClientError::Custom(format!("Failed to decode hex: {e}")))?;
468
469 let block = Block::decode_unfinished(&encoded_block).map_err(|e| {
470 RpcRequestError::RLPDecodeError {
471 method: "debug_getRawBlock".to_string(),
472 message: e.to_string(),
473 }
474 })?;
475 Ok(block.0)
476 }
477
478 pub async fn get_logs(
479 &self,
480 from_block: U256,
481 to_block: U256,
482 address: Address,
483 topics: Vec<H256>,
484 ) -> Result<Vec<RpcLog>, EthClientError> {
485 let params = Some(vec![serde_json::json!(
486 {
487 "fromBlock": format!("{:#x}", from_block),
488 "toBlock": format!("{:#x}", to_block),
489 "address": format!("{:#x}", address),
490 "topics": topics.iter().map(|topic| format!("{topic:#x}")).collect::<Vec<_>>()
491 }
492 )]);
493 let request = RpcRequest::new("eth_getLogs", params);
494 self.send_request_parsed(request).await
495 }
496
497 pub async fn get_transaction_receipt(
498 &self,
499 tx_hash: H256,
500 ) -> Result<Option<RpcReceipt>, EthClientError> {
501 let params = Some(vec![json!(format!("{:#x}", tx_hash))]);
502 let request = RpcRequest::new("eth_getTransactionReceipt", params);
503 self.send_request_parsed(request).await
504 }
505
506 pub async fn get_balance(
507 &self,
508 address: Address,
509 block: BlockIdentifier,
510 ) -> Result<U256, EthClientError> {
511 let params = Some(vec![json!(format!("{:#x}", address)), block.into()]);
512 let request = RpcRequest::new("eth_getBalance", params);
513 self.send_request_parsed(request).await
514 }
515
516 pub async fn get_storage_at(
517 &self,
518 address: Address,
519 slot: U256,
520 block: BlockIdentifier,
521 ) -> Result<U256, EthClientError> {
522 let params = Some(vec![
523 json!(format!("{:#x}", address)),
524 json!(format!("{:#x}", slot)),
525 block.into(),
526 ]);
527 let request = RpcRequest::new("eth_getStorageAt", params);
528 self.send_request_parsed(request).await
529 }
530
531 pub async fn get_chain_id(&self) -> Result<U256, EthClientError> {
532 let request = RpcRequest::new("eth_chainId", None);
533 self.send_request_parsed(request).await
534 }
535
536 pub async fn get_eth_config(&self) -> Result<EthConfigResponse, EthClientError> {
537 let request = RpcRequest::new("eth_config", None);
538 self.send_request_parsed(request).await
539 }
540
541 pub async fn get_code(
542 &self,
543 address: Address,
544 block: BlockIdentifier,
545 ) -> Result<Bytes, EthClientError> {
546 let params = Some(vec![json!(format!("{:#x}", address)), block.into()]);
547 let request = RpcRequest::new("eth_getCode", params);
548
549 match self.send_request(request).await? {
550 RpcResponse::Success(result) => hex::decode(
551 &serde_json::from_value::<String>(result.result)
552 .map(|hex_str| {
553 hex_str
554 .strip_prefix("0x")
555 .map(ToString::to_string)
556 .unwrap_or(hex_str)
557 })
558 .map_err(|e| RpcRequestError::SerdeJSONError {
559 method: "eth_getCode".to_string(),
560 source: e,
561 })
562 .map_err(EthClientError::from)?,
563 )
564 .map(Into::into)
565 .map_err(|e| RpcRequestError::HexError {
566 method: "eth_getCode".to_string(),
567 source: e,
568 })
569 .map_err(EthClientError::from),
570 RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
571 method: "eth_getCode".to_string(),
572 message: error_response.error.message,
573 data: error_response.error.data,
574 }
575 .into()),
576 }
577 }
578
579 pub async fn get_transaction_by_hash(
580 &self,
581 tx_hash: H256,
582 ) -> Result<Option<RpcTransaction>, EthClientError> {
583 let params = Some(vec![json!(format!("{tx_hash:#x}"))]);
584 let request = RpcRequest::new("eth_getTransactionByHash", params);
585 self.send_request_parsed(request).await
586 }
587
588 pub async fn get_witness(
591 &self,
592 from: BlockIdentifier,
593 to: Option<BlockIdentifier>,
594 ) -> Result<RpcExecutionWitness, EthClientError> {
595 let params = if let Some(to_block) = to {
596 Some(vec![from.into(), to_block.into()])
597 } else {
598 Some(vec![from.into()])
599 };
600
601 let request = RpcRequest::new("debug_executionWitness", params);
602 self.send_request_parsed(request).await
603 }
604
605 pub async fn tx_pool_content(&self) -> Result<MempoolContent, EthClientError> {
606 let request = RpcRequest::new("txpool_content", None);
607 self.send_request_parsed(request).await
608 }
609
610 pub async fn get_blob_base_fee(&self, block: BlockIdentifier) -> Result<u64, EthClientError> {
611 let params = Some(vec![block.into()]);
612 let request = RpcRequest::new("eth_blobBaseFee", params);
613
614 match self.send_request(request).await? {
615 RpcResponse::Success(result) => Ok(u64::from_str_radix(
616 serde_json::from_value::<String>(result.result)
617 .map_err(|e| RpcRequestError::SerdeJSONError {
618 method: "eth_blobBaseFee".to_string(),
619 source: e,
620 })?
621 .trim_start_matches("0x"),
622 16,
623 )
624 .map_err(|e| RpcRequestError::ParseIntError {
625 method: "eth_blobBaseFee".to_string(),
626 source: e,
627 })?),
628 RpcResponse::Error(error_response) => Err(RpcRequestError::RPCError {
629 method: "eth_blobBaseFee".to_string(),
630 message: error_response.error.message,
631 data: error_response.error.data,
632 }
633 .into()),
634 }
635 }
636
637 pub async fn test_urls(&self) -> BTreeMap<String, serde_json::Value> {
639 let mut map = BTreeMap::new();
640 for url in self.urls.iter() {
641 let response = match self
642 .send_request_to_url(url, &RpcRequest::new("eth_blockNumber", None))
643 .await
644 {
645 Ok(RpcResponse::Success(ok)) => serde_json::to_value(ok).unwrap_or_else(|e| {
646 serde_json::Value::String(format!("Failed to serialize success response: {e}"))
647 }),
648 Ok(RpcResponse::Error(e)) => serde_json::to_value(e).unwrap_or_else(|e| {
649 serde_json::Value::String(format!("Failed to serialize error response: {e}"))
650 }),
651 Err(e) => serde_json::Value::String(format!("Request error: {e}")),
652 };
653 map.insert(url.to_string(), response);
654 }
655 map
656 }
657}