1use crate::{
2 arrow_reader::{self, BlockReader, LogReader, TraceReader, TransactionReader},
3 simple_types::{Block, Log, Trace, Transaction},
4};
5use anyhow::Context;
6use arrayvec::ArrayVec;
7use arrow::array::RecordBatch;
8
9fn to_opt<T>(val: Result<T, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
10 match val {
11 Ok(val) => Ok(Some(val)),
12 Err(arrow_reader::ReadError::ColumnError(arrow_reader::ColumnError {
15 err: arrow_reader::ColumnErrorType::NotFound,
16 ..
17 })) => Ok(None),
18 Err(e) => Err(anyhow::Error::new(e)),
20 }
21}
22
23fn to_nested_opt<T>(val: Result<Option<T>, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
24 match to_opt(val) {
25 Ok(Some(Some(val))) => Ok(Some(val)),
26 Ok(Some(None)) => Ok(None),
27 Ok(None) => Ok(None),
28 Err(e) => Err(e),
29 }
30}
31
32impl TryFrom<LogReader<'_>> for Log {
33 type Error = anyhow::Error;
34
35 fn try_from(reader: LogReader<'_>) -> Result<Self, Self::Error> {
36 let removed = to_nested_opt(reader.removed()).context("read field removed")?;
37 let log_index = to_opt(reader.log_index()).context("read field log_index")?;
38 let transaction_index =
39 to_opt(reader.transaction_index()).context("read field transaction_index")?;
40 let transaction_hash =
41 to_opt(reader.transaction_hash()).context("read field transaction_hash")?;
42 let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
43 let block_number = to_opt(reader.block_number()).context("read field block_number")?;
44 let address = to_opt(reader.address()).context("read field address")?;
45 let data = to_opt(reader.data()).context("read field data")?;
46 let mut topics = ArrayVec::new();
47 let topic0 = to_nested_opt(reader.topic0()).context("read field topic0")?;
48 topics.push(topic0);
49 let topic1 = to_nested_opt(reader.topic1()).context("read field topic1")?;
50 topics.push(topic1);
51 let topic2 = to_nested_opt(reader.topic2()).context("read field topic2")?;
52 topics.push(topic2);
53 let topic3 = to_nested_opt(reader.topic3()).context("read field topic3")?;
54 topics.push(topic3);
55 Ok(Self {
56 removed,
57 log_index,
58 transaction_index,
59 transaction_hash,
60 block_hash,
61 block_number,
62 address,
63 data,
64 topics,
65 })
66 }
67}
68
69impl TryFrom<BlockReader<'_>> for Block {
70 type Error = anyhow::Error;
71
72 fn try_from(reader: BlockReader<'_>) -> Result<Self, Self::Error> {
73 let number = to_opt(reader.number()).context("read field number")?;
74 let hash = to_opt(reader.hash()).context("read field hash")?;
75 let parent_hash = to_opt(reader.parent_hash()).context("read field parent_hash")?;
76 let nonce = to_nested_opt(reader.nonce()).context("read field nonce")?;
77 let sha3_uncles = to_opt(reader.sha3_uncles()).context("read field sha3_uncles")?;
78 let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
79 let transactions_root =
80 to_opt(reader.transactions_root()).context("read field transactions_root")?;
81 let state_root = to_opt(reader.state_root()).context("read field state_root")?;
82 let receipts_root = to_opt(reader.receipts_root()).context("read field receipts_root")?;
83 let miner = to_opt(reader.miner()).context("read field miner")?;
84 let difficulty = to_nested_opt(reader.difficulty()).context("read field difficulty")?;
85 let total_difficulty =
86 to_nested_opt(reader.total_difficulty()).context("read field total_difficulty")?;
87 let extra_data = to_opt(reader.extra_data()).context("read field extra_data")?;
88 let size = to_opt(reader.size()).context("read field size")?;
89 let gas_limit = to_opt(reader.gas_limit()).context("read field gas_limit")?;
90 let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
91 let timestamp = to_opt(reader.timestamp()).context("read field timestamp")?;
92 let uncles = to_nested_opt(reader.uncles()).context("read field uncles")?;
93 let base_fee_per_gas =
94 to_nested_opt(reader.base_fee_per_gas()).context("read field base_fee_per_gas")?;
95 let blob_gas_used =
96 to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
97 let excess_blob_gas =
98 to_nested_opt(reader.excess_blob_gas()).context("read field excess_blob_gas")?;
99 let parent_beacon_block_root = to_nested_opt(reader.parent_beacon_block_root())
100 .context("read field parent_beacon_block_root")?;
101 let withdrawals_root =
102 to_nested_opt(reader.withdrawals_root()).context("read field withdrawals_root")?;
103 let withdrawals = to_nested_opt(reader.withdrawals()).context("read field withdrawals")?;
104 let l1_block_number =
105 to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
106 let send_count = to_nested_opt(reader.send_count()).context("read field send_count")?;
107 let send_root = to_nested_opt(reader.send_root()).context("read field send_root")?;
108 let mix_hash = to_nested_opt(reader.mix_hash()).context("read field mix_hash")?;
109
110 Ok(Self {
111 number,
112 hash,
113 parent_hash,
114 nonce,
115 sha3_uncles,
116 logs_bloom,
117 transactions_root,
118 state_root,
119 receipts_root,
120 miner,
121 difficulty,
122 total_difficulty,
123 extra_data,
124 size,
125 gas_limit,
126 gas_used,
127 timestamp,
128 uncles,
129 base_fee_per_gas,
130 blob_gas_used,
131 excess_blob_gas,
132 parent_beacon_block_root,
133 withdrawals_root,
134 withdrawals,
135 l1_block_number,
136 send_count,
137 send_root,
138 mix_hash,
139 })
140 }
141}
142
143impl TryFrom<TransactionReader<'_>> for Transaction {
144 type Error = anyhow::Error;
145
146 fn try_from(reader: TransactionReader<'_>) -> Result<Self, Self::Error> {
147 let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
148 let block_number = to_opt(reader.block_number()).context("read field block_number")?;
149 let from = to_nested_opt(reader.from()).context("read field from")?;
150 let gas = to_opt(reader.gas()).context("read field gas")?;
151 let gas_price = to_nested_opt(reader.gas_price()).context("read field gas_price")?;
152 let hash = to_opt(reader.hash()).context("read field hash")?;
153 let input = to_opt(reader.input()).context("read field input")?;
154 let nonce = to_opt(reader.nonce()).context("read field nonce")?;
155 let to = to_nested_opt(reader.to()).context("read field to")?;
156 let transaction_index =
157 to_opt(reader.transaction_index()).context("read field transaction_index")?;
158 let value = to_opt(reader.value()).context("read field value")?;
159 let v = to_nested_opt(reader.v()).context("read field v")?;
160 let r = to_nested_opt(reader.r()).context("read field r")?;
161 let s = to_nested_opt(reader.s()).context("read field s")?;
162 let y_parity = to_nested_opt(reader.y_parity()).context("read field y_parity")?;
163 let max_priority_fee_per_gas = to_nested_opt(reader.max_priority_fee_per_gas())
164 .context("read field max_priority_fee_per_gas")?;
165 let max_fee_per_gas =
166 to_nested_opt(reader.max_fee_per_gas()).context("read field max_fee_per_gas")?;
167 let chain_id = to_nested_opt(reader.chain_id()).context("read field chain_id")?;
168 let access_list = to_nested_opt(reader.access_list()).context("read field access_list")?;
169 let authorization_list =
170 to_nested_opt(reader.authorization_list()).context("read field authorization_list")?;
171 let max_fee_per_blob_gas = to_nested_opt(reader.max_fee_per_blob_gas())
172 .context("read field max_fee_per_blob_gas")?;
173 let blob_versioned_hashes = to_nested_opt(reader.blob_versioned_hashes())
174 .context("read field blob_versioned_hashes")?;
175 let cumulative_gas_used =
176 to_opt(reader.cumulative_gas_used()).context("read field cumulative_gas_used")?;
177 let effective_gas_price =
178 to_opt(reader.effective_gas_price()).context("read field effective_gas_price")?;
179 let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
180 let contract_address =
181 to_nested_opt(reader.contract_address()).context("read field contract_address")?;
182 let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
183 let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
184 let root = to_nested_opt(reader.root()).context("read field root")?;
185 let status = to_nested_opt(reader.status()).context("read field status")?;
186 let l1_fee = to_nested_opt(reader.l1_fee()).context("read field l1_fee")?;
187 let l1_gas_price =
188 to_nested_opt(reader.l1_gas_price()).context("read field l1_gas_price")?;
189 let l1_gas_used = to_nested_opt(reader.l1_gas_used()).context("read field l1_gas_used")?;
190 let l1_fee_scalar =
191 to_nested_opt(reader.l1_fee_scalar()).context("read field l1_fee_scalar")?;
192 let gas_used_for_l1 =
193 to_nested_opt(reader.gas_used_for_l1()).context("read field gas_used_for_l1")?;
194 let blob_gas_price =
195 to_nested_opt(reader.blob_gas_price()).context("read field blob_gas_price")?;
196 let blob_gas_used =
197 to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
198 let deposit_nonce =
199 to_nested_opt(reader.deposit_nonce()).context("read field deposit_nonce")?;
200 let deposit_receipt_version = to_nested_opt(reader.deposit_receipt_version())
201 .context("read field deposit_receipt_version")?;
202 let l1_base_fee_scalar =
203 to_nested_opt(reader.l1_base_fee_scalar()).context("read field l1_base_fee_scalar")?;
204 let l1_blob_base_fee =
205 to_nested_opt(reader.l1_blob_base_fee()).context("read field l1_blob_base_fee")?;
206 let l1_blob_base_fee_scalar = to_nested_opt(reader.l1_blob_base_fee_scalar())
207 .context("read field l1_blob_base_fee_scalar")?;
208 let l1_block_number =
209 to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
210 let mint = to_nested_opt(reader.mint()).context("read field mint")?;
211 let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
212 let source_hash = to_nested_opt(reader.source_hash()).context("read field source_hash")?;
213
214 Ok(Self {
215 block_hash,
216 block_number,
217 from,
218 gas,
219 gas_price,
220 hash,
221 input,
222 nonce,
223 to,
224 transaction_index,
225 value,
226 v,
227 r,
228 s,
229 y_parity,
230 max_priority_fee_per_gas,
231 max_fee_per_gas,
232 chain_id,
233 access_list,
234 authorization_list,
235 max_fee_per_blob_gas,
236 blob_versioned_hashes,
237 cumulative_gas_used,
238 effective_gas_price,
239 gas_used,
240 contract_address,
241 logs_bloom,
242 type_,
243 root,
244 status,
245 l1_fee,
246 l1_gas_price,
247 l1_gas_used,
248 l1_fee_scalar,
249 gas_used_for_l1,
250 blob_gas_price,
251 blob_gas_used,
252 deposit_nonce,
253 deposit_receipt_version,
254 l1_base_fee_scalar,
255 l1_blob_base_fee,
256 l1_blob_base_fee_scalar,
257 l1_block_number,
258 mint,
259 sighash,
260 source_hash,
261 })
262 }
263}
264
265impl Block {
266 pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
268 let mut blocks = Vec::new();
269 for block_reader in BlockReader::iter(batch) {
270 blocks.push(
271 block_reader
272 .try_into()
273 .context("convert block reader to block")?,
274 );
275 }
276 Ok(blocks)
277 }
278}
279
280impl Transaction {
281 pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
283 let mut transactions = Vec::new();
284 for transaction_reader in TransactionReader::iter(batch) {
285 transactions.push(
286 transaction_reader
287 .try_into()
288 .context("convert transaction reader to transaction")?,
289 );
290 }
291 Ok(transactions)
292 }
293}
294
295impl TryFrom<TraceReader<'_>> for Trace {
296 type Error = anyhow::Error;
297
298 fn try_from(reader: TraceReader<'_>) -> Result<Self, Self::Error> {
299 let from = to_nested_opt(reader.from()).context("read field from")?;
300 let to = to_nested_opt(reader.to()).context("read field to")?;
301 let call_type = to_nested_opt(reader.call_type()).context("read field call_type")?;
302 let gas = to_nested_opt(reader.gas()).context("read field gas")?;
303 let input = to_nested_opt(reader.input()).context("read field input")?;
304 let init = to_nested_opt(reader.init()).context("read field init")?;
305 let value = to_nested_opt(reader.value()).context("read field value")?;
306 let author = to_nested_opt(reader.author()).context("read field author")?;
307 let reward_type = to_nested_opt(reader.reward_type()).context("read field reward_type")?;
308 let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
309 let block_number = to_opt(reader.block_number()).context("read field block_number")?;
310 let address = to_nested_opt(reader.address()).context("read field address")?;
311 let code = to_nested_opt(reader.code()).context("read field code")?;
312 let gas_used = to_nested_opt(reader.gas_used()).context("read field gas_used")?;
313 let output = to_nested_opt(reader.output()).context("read field output")?;
314 let subtraces = to_nested_opt(reader.subtraces()).context("read field subtraces")?;
315 let trace_address =
316 to_nested_opt(reader.trace_address()).context("read field trace_address")?;
317 let transaction_hash =
318 to_nested_opt(reader.transaction_hash()).context("read field transaction_hash")?;
319 let transaction_position = to_nested_opt(reader.transaction_position())
320 .context("read field transaction_position")?;
321 let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
322 let error = to_nested_opt(reader.error()).context("read field error")?;
323 let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
324 let action_address =
325 to_nested_opt(reader.action_address()).context("read field action_address")?;
326 let balance = to_nested_opt(reader.balance()).context("read field balance")?;
327 let refund_address =
328 to_nested_opt(reader.refund_address()).context("read field refund_address")?;
329
330 Ok(Self {
331 from,
332 to,
333 call_type,
334 gas,
335 input,
336 init,
337 value,
338 author,
339 reward_type,
340 block_hash,
341 block_number,
342 address,
343 code,
344 gas_used,
345 output,
346 subtraces,
347 trace_address,
348 transaction_hash,
349 transaction_position,
350 type_,
351 error,
352 sighash,
353 action_address,
354 balance,
355 refund_address,
356 })
357 }
358}
359
360impl Log {
361 pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
363 let mut logs = Vec::new();
364 for log_reader in LogReader::iter(batch) {
365 logs.push(log_reader.try_into().context("convert log reader to log")?);
366 }
367 Ok(logs)
368 }
369}
370
371impl Trace {
372 pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
374 let mut traces = Vec::new();
375 for trace_reader in TraceReader::iter(batch) {
376 traces.push(
377 trace_reader
378 .try_into()
379 .context("convert trace reader to trace")?,
380 );
381 }
382 Ok(traces)
383 }
384}