1use std::thread::sleep;
2use std::time::Duration;
3
4use ethers::prelude::*;
5use ethers::providers::{Http, Provider};
6use ethers::types::{Block, Log, Transaction as EtherTransaction, TxHash};
7use merkle::{string_to_crypto_hash, MerkleTree, MerkleTreeProof, MerkleTreeRoot};
8use serde::Serialize;
9use types::{EthereumClient, SyncData, Transaction};
10
11pub mod merkle;
12pub mod types;
13pub use ethers::*;
14
15impl EthereumClient {
16 pub async fn new(
17 rpc: &str,
18 chain_name: &str,
19 chain_id: u64,
20 start_block: u64,
21 addresses: Vec<Address>,
22 ) -> Self {
23 let provider = Provider::<Http>::try_from(rpc).expect("Invalid provider");
24
25 Self {
26 chain_name: chain_name.to_owned(),
27 chain_id,
28 provider,
29 start_block,
30 addresses,
31 }
32 }
33
34 pub async fn new_sync(&self, from: u64, events: &[&str]) -> anyhow::Result<SyncData> {
35 let cur = self.provider.get_block_number().await?.as_u64() - 3;
36
37 Ok(SyncData {
38 cur,
39 from,
40 filters: Filter::new()
41 .address(self.addresses.clone())
42 .events(events)
43 .from_block(from)
44 .to_block(cur),
45 n: 50000,
46 gap: 3,
47 })
48 }
49
50 pub async fn fetch_event(
51 &mut self,
52 sync_data: &mut SyncData,
53 ) -> anyhow::Result<(Vec<Log>, u64)> {
54 let gap = sync_data.cur - sync_data.from;
55 let limit = if gap > sync_data.n {
56 sync_data.from + sync_data.n - 1
57 } else {
58 sleep(Duration::from_secs(10));
59 sync_data.cur
60 };
61
62 sync_data.filters = sync_data
63 .filters
64 .clone()
65 .from_block(sync_data.from)
66 .to_block(U64([limit]));
67
68 let mut number = limit + 1;
69 if gap > sync_data.n {
70 sync_data.from = limit + 1;
71 } else {
72 sync_data.from = limit;
73 number = self.provider.get_block_number().await?.as_u64() - sync_data.gap;
74 sync_data.cur = number;
75 }
76 let logs = self.provider.get_logs(&sync_data.filters).await?;
77 Ok((logs, number))
78 }
79
80 pub async fn get_block_count(&self) -> anyhow::Result<u64> {
81 Ok(self.provider.get_block_number().await?.as_u64())
82 }
83
84 pub async fn get_block(&self, block_number: u64) -> anyhow::Result<Option<Block<TxHash>>> {
85 Ok(self.provider.get_block(block_number).await?)
86 }
87
88 pub async fn get_block_transactions(
90 &self,
91 block_number: u64,
92 ) -> anyhow::Result<Vec<EtherTransaction>> {
93 let block = self.get_block(block_number).await?;
94 if let Some(block) = block {
95 let mut transactions = Vec::new();
96 for tx_hash in block.transactions {
97 if let Some(tx) = self.get_transaction(tx_hash).await? {
98 transactions.push(tx);
99 }
100 }
101 Ok(transactions)
102 } else {
103 Ok(vec![])
104 }
105 }
106
107 pub async fn get_transaction(
108 &self,
109 tx_hash: TxHash,
110 ) -> anyhow::Result<Option<EtherTransaction>> {
111 Ok(self.provider.get_transaction(tx_hash).await?)
112 }
113
114 pub async fn get_transaction_receipt(
115 &self,
116 tx_hash: TxHash,
117 ) -> anyhow::Result<Option<TransactionReceipt>> {
118 Ok(self.provider.get_transaction_receipt(tx_hash).await?)
119 }
120
121 pub async fn get_logs(&self, start_block: u64, end_block: u64) -> anyhow::Result<Vec<Log>> {
122 Ok(self
123 .provider
124 .get_logs(&Filter::new().from_block(start_block).to_block(end_block))
125 .await?)
126 }
127
128 fn data_slice<T>(datas: &Vec<T>) -> Vec<Vec<u8>>
129 where
130 T: Serialize,
131 {
132 datas
133 .iter()
134 .map(|f| string_to_crypto_hash(&serde_json::to_string(f).unwrap()).to_vec())
135 .collect()
136 }
137
138 pub async fn get_transaction_merkle(&self, block: &Block<H256>) -> anyhow::Result<MerkleTree> {
139 let mut txs = Vec::new();
140 for x in block.transactions.clone() {
141 if let Some(receipt) = self.get_transaction_receipt(x).await? {
142 txs.push(serde_json::to_vec(&Transaction {
143 tx_hash: serde_json::to_string(&receipt.transaction_hash)?,
144 index: receipt.transaction_index.as_u64(),
145 logs: receipt
146 .logs
147 .iter()
148 .map(|f| serde_json::to_string(f).unwrap())
149 .collect(),
150 from: format!("{:?}", receipt.from),
151 to: format!("{:?}", receipt.to),
152 block_hash: format!("{:?}", receipt.block_hash),
153 root: receipt.root.unwrap_or_default().to_string(),
154 logs_bloom: receipt.logs_bloom.to_string(),
155 })?);
156 }
157 }
158 Ok(MerkleTree::build(&txs))
159 }
160
161 pub async fn get_root_merkle(
162 &self,
163 block: &Block<H256>,
164 index: Option<u64>,
165 ) -> anyhow::Result<(MerkleTreeRoot, MerkleTreeProof, Vec<u8>)> {
166 let mut items = Vec::new();
167 let mut i = 0;
168 let mut count = 0;
169 for tx_hash in block.transactions.clone() {
170 if let Some(receipt) = self.get_transaction_receipt(tx_hash).await? {
171 items.push(serde_json::to_vec(&Transaction {
172 tx_hash: serde_json::to_string(&receipt.transaction_hash).unwrap(),
173 index: receipt.transaction_index.as_u64(),
174 logs: receipt
175 .logs
176 .iter()
177 .map(|f| serde_json::to_string(f).unwrap())
178 .collect(),
179 from: format!("{:?}", receipt.from),
180 to: format!("{:?}", receipt.to),
181 block_hash: format!("{:?}", receipt.block_hash),
182 root: receipt.root.unwrap_or_default().to_string(),
183 logs_bloom: receipt.logs_bloom.to_string(),
184 })?);
185 if let Some(c) = index {
186 if receipt.transaction_index.as_u64() == c {
187 i = count;
188 }
189 }
190 count += 1;
191 }
192 }
193 let merkle = MerkleTree::build(&items);
194
195 Ok((merkle.root, merkle.proofs[i].clone(), items[i].clone()))
196 }
197
198 pub fn get_hash_merkle(
199 block: &Block<H256>,
200 transaction_hash: Option<H256>,
201 ) -> (MerkleTreeRoot, MerkleTreeProof) {
202 let mut tx_hashs = block.transactions.clone();
203 tx_hashs.push(block.transactions_root);
204
205 let index = if let Some(hx) = transaction_hash {
206 tx_hashs
207 .iter()
208 .position(|h| h == &hx)
209 .expect("Transaction hash not found")
210 } else {
211 0
212 };
213 let hash_items = Self::data_slice(&tx_hashs);
214 let merkle = MerkleTree::build(&hash_items);
215 (merkle.root, merkle.proofs[index].clone())
216 }
217}