cherry_evm_validate/
lib.rs

1mod issues_collector;
2
3pub use issues_collector::{DataContext, IssueCollector, IssueCollectorConfig, ReportFormat};
4
5use std::collections::BTreeMap;
6
7use anyhow::{anyhow, Context, Result};
8use arrow::array::{
9    Array, AsArray, BinaryArray, Decimal256Array, GenericByteArray, GenericListArray, ListArray,
10    PrimitiveArray, StructArray, UInt64Array, UInt8Array,
11};
12use arrow::datatypes::{i256, Decimal256Type, GenericBinaryType, UInt8Type};
13use arrow::{datatypes::UInt64Type, record_batch::RecordBatch};
14
15use alloy_consensus::proofs::{calculate_receipt_root, calculate_transaction_root};
16use alloy_consensus::{
17    Eip658Value, Receipt, ReceiptEnvelope, SignableTransaction, TxEip1559, TxEip2930, TxEip4844,
18    TxEip4844Variant, TxEnvelope, TxLegacy,
19};
20use alloy_eips::eip2930::{AccessList, AccessListItem};
21use alloy_primitives::{
22    Address, Bloom, Bytes, FixedBytes, Log, PrimitiveSignature, TxKind, Uint, B256, U256,
23};
24
25struct LogArray<'a> {
26    block_number: &'a PrimitiveArray<UInt64Type>,
27    tx_index: &'a PrimitiveArray<UInt64Type>,
28    log_index: &'a PrimitiveArray<UInt64Type>,
29    address: &'a GenericByteArray<GenericBinaryType<i32>>,
30    topic0: &'a GenericByteArray<GenericBinaryType<i32>>,
31    topic1: &'a GenericByteArray<GenericBinaryType<i32>>,
32    topic2: &'a GenericByteArray<GenericBinaryType<i32>>,
33    topic3: &'a GenericByteArray<GenericBinaryType<i32>>,
34    data: &'a GenericByteArray<GenericBinaryType<i32>>,
35}
36
37struct TransactionsArray<'a> {
38    block_number: &'a PrimitiveArray<UInt64Type>,
39    gas_limit: &'a PrimitiveArray<Decimal256Type>,
40    gas_price: &'a PrimitiveArray<Decimal256Type>,
41    hash: &'a GenericByteArray<GenericBinaryType<i32>>,
42    input: &'a GenericByteArray<GenericBinaryType<i32>>,
43    nonce: &'a PrimitiveArray<Decimal256Type>,
44    to: &'a GenericByteArray<GenericBinaryType<i32>>,
45    tx_index: &'a PrimitiveArray<UInt64Type>,
46    value: &'a PrimitiveArray<Decimal256Type>,
47    v: &'a PrimitiveArray<UInt8Type>,
48    r: &'a GenericByteArray<GenericBinaryType<i32>>,
49    s: &'a GenericByteArray<GenericBinaryType<i32>>,
50    max_priority_fee_per_gas: &'a PrimitiveArray<Decimal256Type>,
51    max_fee_per_gas: &'a PrimitiveArray<Decimal256Type>,
52    chain_id: &'a PrimitiveArray<Decimal256Type>,
53    cumulative_gas_used: &'a PrimitiveArray<Decimal256Type>,
54    contract_address: &'a GenericByteArray<GenericBinaryType<i32>>,
55    logs_bloom: &'a GenericByteArray<GenericBinaryType<i32>>,
56    tx_type: &'a PrimitiveArray<UInt8Type>,
57    status: &'a PrimitiveArray<UInt8Type>,
58    sighash: &'a GenericByteArray<GenericBinaryType<i32>>,
59    access_list: &'a GenericListArray<i32>,
60    max_fee_per_blob_gas: &'a PrimitiveArray<Decimal256Type>,
61    blob_versioned_hashes: &'a GenericListArray<i32>,
62}
63
64struct BlockArray<'a> {
65    number: &'a PrimitiveArray<UInt64Type>,
66    receipts_root: &'a GenericByteArray<GenericBinaryType<i32>>,
67    transactions_root: &'a GenericByteArray<GenericBinaryType<i32>>,
68}
69
70/// Checks that:
71///
72/// - Everything is ordered by (block_number, tx_index/log_index)
73///
74/// - No gaps in (block_number, tx_index/log_index)
75///
76/// - block_hash/tx_hash matches with block_number/(block_number, tx_index)
77///
78/// - parent hash matches with previous block's hash
79///
80pub fn validate_block_data(
81    blocks: &RecordBatch,
82    transactions: &RecordBatch,
83    logs: &RecordBatch,
84    traces: &RecordBatch,
85) -> Result<()> {
86    let block_numbers = blocks
87        .column_by_name("number")
88        .context("get block number column")?
89        .as_any()
90        .downcast_ref::<UInt64Array>()
91        .context("get block number column as u64")?;
92
93    if block_numbers.null_count() > 0 {
94        return Err(anyhow!("block.number column can't have nulls"));
95    }
96
97    let first_block_num = block_numbers
98        .iter()
99        .next()
100        .map(Option::unwrap)
101        .unwrap_or_default();
102    let mut current_bn = first_block_num;
103    for bn in block_numbers.iter().skip(1) {
104        let bn = bn.unwrap();
105        if current_bn + 1 != bn {
106            return Err(anyhow!(
107                "block.number column is not consistent. {} != {}",
108                current_bn + 1,
109                bn
110            ));
111        }
112        current_bn = bn;
113    }
114
115    let block_hashes = blocks
116        .column_by_name("hash")
117        .context("get block hash column")?
118        .as_any()
119        .downcast_ref::<BinaryArray>()
120        .context("get block hash as binary array")?;
121
122    let block_parent_hashes = blocks
123        .column_by_name("parent_hash")
124        .context("get block parent_hash column")?
125        .as_any()
126        .downcast_ref::<BinaryArray>()
127        .context("get block parent_hash as binary array")?;
128
129    let get_block_hash = |block_num: u64| -> Option<&[u8]> {
130        let pos = usize::try_from(block_num.checked_sub(first_block_num)?).unwrap();
131        if pos < block_hashes.len() {
132            Some(block_hashes.value(pos))
133        } else {
134            None
135        }
136    };
137
138    if block_hashes.null_count() > 0 {
139        return Err(anyhow!("block.hash column can't have nulls"));
140    }
141
142    if block_parent_hashes.null_count() > 0 {
143        return Err(anyhow!("block.parent_has column can't have nulls"));
144    }
145
146    for (expected_parent_hash, parent_hash) in
147        block_hashes.iter().zip(block_parent_hashes.iter().skip(1))
148    {
149        let expected_parent_hash = expected_parent_hash.unwrap();
150        let parent_hash = parent_hash.unwrap();
151        if expected_parent_hash != parent_hash {
152            return Err(anyhow!(
153                "bad parent hash found. expected {}, found {}",
154                faster_hex::hex_string(expected_parent_hash),
155                faster_hex::hex_string(parent_hash)
156            ));
157        }
158    }
159
160    validate_block_hashes(get_block_hash, transactions).context("validate tx block hashes")?;
161    validate_block_hashes(get_block_hash, logs).context("validate log block hashes")?;
162    validate_block_hashes(get_block_hash, traces).context("validate trace block hashes")?;
163
164    // Validate tx ordering and check tx hashes of other tables
165
166    let mut tx_hash_mapping = vec![Vec::<[u8; 32]>::with_capacity(200); block_numbers.len()];
167
168    let tx_hashes = transactions
169        .column_by_name("hash")
170        .context("get tx hash col")?
171        .as_any()
172        .downcast_ref::<BinaryArray>()
173        .context("tx hash col as binary")?;
174    let tx_block_nums = transactions
175        .column_by_name("block_number")
176        .context("get tx block num col")?
177        .as_any()
178        .downcast_ref::<UInt64Array>()
179        .context("get tx block num col as u64")?;
180    let tx_indices = transactions
181        .column_by_name("transaction_index")
182        .context("get tx index column")?
183        .as_any()
184        .downcast_ref::<UInt64Array>()
185        .context("get tx index col as u64")?;
186
187    if tx_hashes.null_count() > 0 {
188        return Err(anyhow!("tx hash column can't have nulls"));
189    }
190    if tx_block_nums.null_count() > 0 {
191        return Err(anyhow!("tx block number column can't have nulls"));
192    }
193    if tx_indices.null_count() > 0 {
194        return Err(anyhow!("tx index column can't have nulls"));
195    }
196
197    let mut expected_tx_index = 0;
198    let mut current_block_num = first_block_num;
199
200    for ((tx_hash, tx_bn), tx_idx) in tx_hashes
201        .iter()
202        .zip(tx_block_nums.iter())
203        .zip(tx_indices.iter())
204    {
205        let tx_hash = tx_hash.unwrap();
206        let tx_bn = tx_bn.unwrap();
207        let tx_idx = tx_idx.unwrap();
208
209        if tx_bn != current_block_num {
210            if tx_bn < current_block_num {
211                return Err(anyhow!(
212                    "found wrong ordering in tx block numbers after block num {}",
213                    current_block_num
214                ));
215            }
216
217            current_block_num = tx_bn;
218            expected_tx_index = 0;
219        }
220
221        if tx_idx != expected_tx_index {
222            return Err(anyhow!(
223                "found unexpected tx index at the start of block {}",
224                current_block_num
225            ));
226        }
227        expected_tx_index += 1;
228
229        let block_pos = tx_bn
230            .checked_sub(first_block_num)
231            .with_context(|| format!("unexpected block num {} in transactions", tx_bn))?;
232        let mappings = tx_hash_mapping
233            .get_mut(usize::try_from(block_pos).unwrap())
234            .unwrap();
235
236        assert_eq!(mappings.len(), usize::try_from(tx_idx).unwrap());
237
238        if tx_hash.len() != 32 {
239            return Err(anyhow!("found bad tx hash at {},{}", tx_bn, tx_idx));
240        }
241
242        mappings.push(tx_hash.try_into().unwrap());
243    }
244
245    validate_transaction_hashes(first_block_num, &tx_hash_mapping, logs, "transaction_index")
246        .context("check tx hashes in logs")?;
247    validate_transaction_hashes(
248        first_block_num,
249        &tx_hash_mapping,
250        traces,
251        "transaction_position",
252    )
253    .context("check tx hashes in traces")?;
254
255    // VALIDATE LOG ORDERING
256
257    let log_block_nums = logs
258        .column_by_name("block_number")
259        .context("get log block num col")?
260        .as_any()
261        .downcast_ref::<UInt64Array>()
262        .context("get log block num col as u64")?;
263    let log_indices = logs
264        .column_by_name("log_index")
265        .context("get log index column")?
266        .as_any()
267        .downcast_ref::<UInt64Array>()
268        .context("get log index col as u64")?;
269
270    if log_block_nums.null_count() > 0 {
271        return Err(anyhow!("log block number column can't have nulls"));
272    }
273    if log_indices.null_count() > 0 {
274        return Err(anyhow!("log index column can't have nulls"));
275    }
276
277    let mut expected_log_index = 0;
278    let mut current_block_num = first_block_num;
279
280    for (log_idx, log_bn) in log_indices.iter().zip(log_block_nums.iter()) {
281        let log_idx = log_idx.unwrap();
282        let log_bn = log_bn.unwrap();
283
284        if log_bn != current_block_num {
285            if log_bn < current_block_num {
286                return Err(anyhow!(
287                    "found wrong ordering in log block numbers after block num {}",
288                    current_block_num
289                ));
290            }
291
292            expected_log_index = 0;
293            current_block_num = log_bn;
294        }
295
296        if log_idx != expected_log_index {
297            return Err(anyhow!(
298                "found unexpected log index, expected {},{} but got {} for index",
299                log_bn,
300                expected_log_index,
301                log_idx
302            ));
303        }
304        expected_log_index += 1;
305    }
306
307    // VALIDATE TRACE ORDERING
308
309    let trace_block_nums = traces
310        .column_by_name("block_number")
311        .context("get trace block num col")?
312        .as_any()
313        .downcast_ref::<UInt64Array>()
314        .context("get trace block num col as u64")?;
315    let trace_tx_indices = traces
316        .column_by_name("transaction_position")
317        .context("get trace tx index column")?
318        .as_any()
319        .downcast_ref::<UInt64Array>()
320        .context("get trace tx index col as u64")?;
321
322    if trace_block_nums.null_count() > 0 {
323        return Err(anyhow!("log block number column can't have nulls"));
324    }
325
326    let mut current_tx_pos = 0;
327    let mut current_block_num = first_block_num;
328
329    for (trace_bn, trace_tx_pos) in trace_block_nums.iter().zip(trace_tx_indices.iter()) {
330        let prev_bn = current_block_num;
331
332        let trace_bn = trace_bn.unwrap();
333
334        if trace_bn != current_block_num {
335            if trace_bn < current_block_num {
336                return Err(anyhow!(
337                    "found wrong ordering in trace block numbers after block num {}",
338                    current_block_num
339                ));
340            }
341
342            current_tx_pos = 0;
343            current_block_num = trace_bn;
344        }
345
346        let tx_pos = match trace_tx_pos {
347            Some(x) => x,
348            // This can be None for block reward traces and maybe for other traces that don't associate to blocks for some reason
349            None => continue,
350        };
351
352        if tx_pos < current_tx_pos {
353            return Err(anyhow!(
354                "found bad tx position ordering after {},{}",
355                prev_bn,
356                current_tx_pos
357            ));
358        }
359        current_tx_pos = tx_pos;
360    }
361
362    Ok(())
363}
364
365fn validate_block_hashes<'a, F: Fn(u64) -> Option<&'a [u8]>>(
366    get_block_hash: F,
367    data: &RecordBatch,
368) -> Result<()> {
369    let block_hashes = data
370        .column_by_name("block_hash")
371        .context("get block hash column")?
372        .as_any()
373        .downcast_ref::<BinaryArray>()
374        .context("block hash col as binary")?;
375    let block_numbers = data
376        .column_by_name("block_number")
377        .context("get block number column")?
378        .as_any()
379        .downcast_ref::<UInt64Array>()
380        .context("block number as u64")?;
381
382    if block_hashes.null_count() > 0 {
383        return Err(anyhow!("block hash column can't have nulls"));
384    }
385
386    if block_numbers.null_count() > 0 {
387        return Err(anyhow!("block number column can't have nulls"));
388    }
389
390    for (bn, hash) in block_numbers.iter().zip(block_hashes.iter()) {
391        let bn = bn.unwrap();
392        let hash = hash.unwrap();
393
394        let expected = match get_block_hash(bn) {
395            Some(h) => h,
396            None => {
397                return Err(anyhow!("couldn't find expected hash for block {}", bn));
398            }
399        };
400
401        if expected != hash {
402            return Err(anyhow!(
403                "block hash mismatch at block {}. expected {} got {}",
404                bn,
405                faster_hex::hex_string(expected),
406                faster_hex::hex_string(hash)
407            ));
408        }
409    }
410
411    Ok(())
412}
413
414fn validate_transaction_hashes(
415    first_block_num: u64,
416    expected_tx_hashes: &[Vec<[u8; 32]>],
417    data: &RecordBatch,
418    tx_index_col_name: &str,
419) -> Result<()> {
420    let tx_indices = data
421        .column_by_name(tx_index_col_name)
422        .context("get tx index column")?
423        .as_any()
424        .downcast_ref::<UInt64Array>()
425        .context("get tx index col as u64")?;
426    let block_numbers = data
427        .column_by_name("block_number")
428        .context("get block number column")?
429        .as_any()
430        .downcast_ref::<UInt64Array>()
431        .context("block number as u64")?;
432    let tx_hashes = data
433        .column_by_name("transaction_hash")
434        .context("get tx hash column")?
435        .as_any()
436        .downcast_ref::<BinaryArray>()
437        .context("get tx hash col as binary")?;
438
439    if block_numbers.null_count() > 0 {
440        return Err(anyhow!("block number column can't have nulls"));
441    }
442
443    for ((tx_idx, tx_hash), bn) in tx_indices
444        .iter()
445        .zip(tx_hashes.iter())
446        .zip(block_numbers.iter())
447    {
448        // Skip entries that don't associate to transactions, e.g. block reward traces.
449        if let Some(tx_idx) = tx_idx {
450            let bn = bn.unwrap();
451            let tx_hash = match tx_hash {
452                Some(h) => h,
453                None => {
454                    return Err(anyhow!("tx hash no found for tx {},{}", bn, tx_idx));
455                }
456            };
457
458            let block_i = match bn.checked_sub(first_block_num) {
459                Some(i) => usize::try_from(i).unwrap(),
460                None => return Err(anyhow!("bad block num: {}", bn)),
461            };
462
463            let expected_tx_hash = expected_tx_hashes
464                .get(block_i)
465                .with_context(|| format!("block {} not found in given data", bn))?
466                .get(usize::try_from(tx_idx).unwrap())
467                .with_context(|| format!("tx hash data for tx {},{} not found", bn, tx_idx))?;
468
469            if expected_tx_hash != tx_hash {
470                return Err(anyhow!(
471                    "tx hash mismatch for tx {},{}. Expected {}, Found {}",
472                    bn,
473                    tx_idx,
474                    faster_hex::hex_string(expected_tx_hash),
475                    faster_hex::hex_string(tx_hash)
476                ));
477            }
478        }
479    }
480
481    Ok(())
482}
483
484#[rustfmt::skip]
485pub fn validate_root_hashes(
486    blocks: &RecordBatch,
487    logs: &RecordBatch,
488    transactions: &RecordBatch,
489    issues_collector: &mut IssueCollector,
490) -> Result<()> {
491    let ic = issues_collector;
492
493    // CREATE A LOG MAPPING
494
495    let log_array = extract_log_cols_as_arrays(logs)?;
496
497    // get first log block num and tx idx
498    let mut current_block_num = log_array.block_number.value(0);
499    let mut current_tx_idx = log_array.log_index.value(0);
500    // initialize a vec to store all logs for a tx
501    let mut tx_logs = Vec::<Log>::with_capacity(20);
502    // initialize a map to store logs by block num and tx idx
503    let mut logs_by_block_num_and_tx_idx = BTreeMap::<(u64, u64), Vec<Log>>::new();
504
505    let log_iterators = log_array
506        .block_number
507        .iter()
508        .zip(log_array.log_index.iter())
509        .zip(log_array.tx_index.iter())
510        .zip(log_array.address.iter())
511        .zip(log_array.topic0.iter())
512        .zip(log_array.topic1.iter())
513        .zip(log_array.topic2.iter())
514        .zip(log_array.topic3.iter())
515        .zip(log_array.data.iter());
516
517    // iterate over logs rows
518    for (
519        (
520            (
521                (
522                    ((((block_nums_opt, log_idx_opt), tx_idx_opt), address_opt), topic0_opt),
523                    topic1_opt,
524                ),
525                topic2_opt,
526            ),
527            topic3_opt,
528        ),
529        data_opt,
530    ) in log_iterators
531    {
532        // cast values to expected types
533        let block_num = block_nums_opt.unwrap(); // Block number can't be None because we are using it as key in the logs_by_block_num_and_tx_idx mapping
534        let tx_idx = tx_idx_opt.unwrap(); // Tx index can't be None because we are using it as key in the logs_by_block_num_and_tx_idx mapping
535        let log_idx = log_idx_opt.unwrap_or_else(|| {
536            ic.report_with_context(
537                "log_idx is None",
538                DataContext::new(
539                    "Logs".to_string(),
540                    format!("Block_num {}, Tx_idx {}", block_num, tx_idx),
541                ),
542                99999,
543            )
544        });
545        ic.set_context(DataContext::new(
546            "Logs".to_string(),
547            format!(
548                "Block_num {}, Tx_idx {}, Log_idx {}",
549                block_num, tx_idx, log_idx
550            ),
551        ));
552        let address = address_opt
553            .unwrap_or_else(|| ic.report("address is None", &[0; 20]))
554            .try_into()
555            .unwrap_or_else(|_| ic.report("address is invalid", Address::ZERO));
556        let topic0: FixedBytes<32> = topic0_opt
557            .unwrap_or_else(|| ic.report("topic0 is None", &[0; 32]))
558            .try_into()
559            .unwrap_or_else(|_| ic.report("topic0 is invalid", FixedBytes::<32>::new([0; 32])));
560        let topic1: Option<FixedBytes<32>> = topic1_opt.and_then(|t| {
561            t.try_into()
562                .ok()
563                .or_else(|| ic.report("topic1 is invalid", None))
564        });
565        let topic2: Option<FixedBytes<32>> = topic2_opt.and_then(|t| {
566            t.try_into()
567                .ok()
568                .or_else(|| ic.report("topic2 is invalid", None))
569        });
570        let topic3: Option<FixedBytes<32>> = topic3_opt.and_then(|t| {
571            t.try_into()
572                .ok()
573                .or_else(|| ic.report("topic3 is invalid", None))
574        });
575        let log_data = data_opt.unwrap_or_else(|| ic.report("log_data is None", &[0; 0]));
576        let log_data = Bytes::copy_from_slice(log_data);
577
578        // create a vec of topics with None values removed
579        let topics: Vec<_> = [Some(topic0), topic1, topic2, topic3]
580            .into_iter()
581            .flatten()
582            .collect();
583
584        // if the block num or tx idx has changed, store the previous tx logs in the mapping, clear the logs vec and update the current block num and tx idx
585        if block_num != current_block_num || tx_idx != current_tx_idx {
586            if !tx_logs.is_empty() {
587                logs_by_block_num_and_tx_idx
588                    .insert((current_block_num, current_tx_idx), tx_logs.clone());
589                tx_logs.clear();
590            }
591            current_block_num = block_num;
592            current_tx_idx = tx_idx;
593        }
594
595        // create a log object and add it to the tx logs vec
596        let log = Log::new(address, topics, log_data).expect("log is invalid");
597        tx_logs.push(log);
598    }
599    // store the last tx logs in the mapping
600    logs_by_block_num_and_tx_idx.insert((current_block_num, current_tx_idx), tx_logs);
601    ic.set_context(DataContext::default());
602
603    // CREATE A TRANSACTION MAPPING
604    let tx_array = extract_transaction_cols_as_arrays(transactions)?;
605    let mut current_block_num = tx_array.block_number.value(0);
606    // initialize a map to store transaction root by block num
607    let mut transactions_root_by_block_num_mapping = BTreeMap::<u64, FixedBytes<32>>::new();
608    // initialize a map to store receipts by block num
609    let mut receipts_root_by_block_num_mapping = BTreeMap::<u64, FixedBytes<32>>::new();
610    // initialize a vec to store tx envelopes for a tx
611    let mut block_tx_envelopes = Vec::<TxEnvelope>::with_capacity(200);
612    // initialize a vec to store receipts for a tx
613    let mut block_tx_receipts = Vec::<ReceiptEnvelope>::with_capacity(200);
614    // initialize an empty vec of logs, used if the tx failed or doesn't have logs
615    let empty_logs = Vec::<Log>::new();
616
617    let tx_iterators = tx_array
618        .block_number
619        .iter()
620        .zip(tx_array.gas_limit.iter())
621        .zip(tx_array.gas_price.iter())
622        .zip(tx_array.hash.iter())
623        .zip(tx_array.input.iter())
624        .zip(tx_array.nonce.iter())
625        .zip(tx_array.to.iter())
626        .zip(tx_array.tx_index.iter())
627        .zip(tx_array.value.iter())
628        .zip(tx_array.v.iter())
629        .zip(tx_array.r.iter())
630        .zip(tx_array.s.iter())
631        .zip(tx_array.max_priority_fee_per_gas.iter())
632        .zip(tx_array.max_fee_per_gas.iter())
633        .zip(tx_array.chain_id.iter())
634        .zip(tx_array.cumulative_gas_used.iter())
635        .zip(tx_array.contract_address.iter())
636        .zip(tx_array.logs_bloom.iter())
637        .zip(tx_array.tx_type.iter())
638        .zip(tx_array.status.iter())
639        .zip(tx_array.sighash.iter())
640        .zip(tx_array.access_list.iter())
641        .zip(tx_array.max_fee_per_blob_gas.iter())
642        .zip(tx_array.blob_versioned_hashes.iter());
643
644    // iterate over transactions rows
645    for (((((((((((((((((((((((
646        tx_block_nums_opt
647        , tx_gas_limit_opt)
648        , tx_gas_price_opt)
649        , tx_hash_opt)
650        , tx_input_opt)
651        , tx_nonce_opt)
652        , tx_to_opt)
653        , tx_tx_idx_opt)
654        , tx_value_opt)
655        , tx_v_opt)
656        , tx_r_opt)
657        , tx_s_opt)
658        , tx_max_priority_fee_per_gas_opt)
659        , tx_max_fee_per_gas_opt)
660        , tx_chain_id_opt)
661        , tx_cumulative_gas_used_opt)
662        , tx_contract_address_opt)
663        , tx_logs_bloom_opt)
664        , tx_type_opt) 
665        , tx_status_opt)
666        , tx_sighash_opt)
667        , tx_access_list_opt)
668        , tx_max_fee_per_blob_gas_opt)
669        , tx_blob_versioned_hashes_opt) in tx_iterators {
670        // create contingent row context
671        let cont_row_ctx = match (tx_block_nums_opt, tx_tx_idx_opt) {
672            (Some(block_num), Some(tx_idx)) => {
673                format!("Block_num {}, Tx_idx {}", block_num, tx_idx)
674            }
675            _ => "Undefined".to_string(),
676        };
677
678        // Try to unwrap and cast tx_hash to a FixedBytes<32>, report if issue and set context to the contingent row
679        let expected_hash: FixedBytes<32> = match tx_hash_opt {
680            //try to unwrap tx_hash
681            None => {
682                ic.set_context(DataContext::new("Transactions".to_string(), cont_row_ctx));
683                ic.report("tx_hash is None", FixedBytes::<32>::new([0; 32]))
684            }
685            Some(hash) => match hash.try_into() {
686                //try cast to FixedBytes<32>
687                Ok(hash) => {
688                    ic.set_context(DataContext::new(
689                        "Transactions".to_string(),
690                        format!("Tx_hash {}", hash),
691                    ));
692                    hash
693                }
694                Err(_) => {
695                    ic.set_context(DataContext::new("Transactions".to_string(), cont_row_ctx));
696                    ic.report("tx_hash is invalid", FixedBytes::<32>::new([0; 32]))
697                }
698            },
699        };
700
701        // cast values to expected types
702        let block_num = tx_block_nums_opt.unwrap(); // Block number can't be None because we are using it as key in the root_by_block_num_mapping mapping
703        let gas_limit = u64::try_from(
704            tx_gas_limit_opt
705                .unwrap_or_else(|| ic.report("gas_limit is None", i256::ZERO))
706                .as_i128(),
707        )
708        .unwrap();
709        let gas_price = u128::try_from(
710            tx_gas_price_opt
711                .unwrap_or_else(|| ic.report("gas_price is None", i256::ZERO))
712                .as_i128(),
713        )
714        .unwrap();
715        let input = tx_input_opt.unwrap_or_else(|| ic.report("input is None", &[0; 0]));
716        let input = Bytes::copy_from_slice(input);
717        let nonce = u64::try_from(
718            tx_nonce_opt
719                .unwrap_or_else(|| ic.report("nonce is None", i256::ZERO))
720                .as_i128(),
721        )
722        .unwrap();
723        let to: Option<Address> = tx_to_opt.and_then(|a| {
724            a.try_into()
725                .ok()
726                .or_else(|| ic.report("to is invalid", None))
727        });
728        let tx_idx = tx_tx_idx_opt.unwrap_or_else(|| ic.report("tx_idx is None", 0));
729        let value = U256::try_from(
730            tx_value_opt
731                .unwrap_or_else(|| ic.report("value is None", i256::ZERO))
732                .as_i128(),
733        )
734        .unwrap();
735        let chain_id = tx_chain_id_opt.and_then(|id| {
736            u64::try_from(id.as_i128())
737                .ok()
738                .or_else(|| ic.report("chain_id is invalid", None))
739        });
740        // EIP-155: The recovery identifier boollean is v - 27 for legacy transactions and v = chainId * 2 + 35 for EIP-155 transactions.
741        let r_id: u8 = (chain_id.unwrap_or(1) * 2 + 35)
742            .try_into()
743            .expect("invalid chain_id, produced signiture v is out of range");
744        let v = tx_v_opt.unwrap_or_else(|| ic.report("v is None", 0));
745        let v = if v == 0 || v == 27 || v == r_id {
746            false
747        } else if v == 1 || v == 28 || v == r_id + 1 {
748            true
749        } else {
750            return Err(anyhow!("invalid v"));
751        };
752        let r: Uint<256, 4> =
753            U256::try_from_be_slice(tx_r_opt.unwrap_or_else(|| ic.report("r is None", &[0; 32])))
754                .expect("invalid r");
755        let s: Uint<256, 4> =
756            U256::try_from_be_slice(tx_s_opt.unwrap_or_else(|| ic.report("s is None", &[0; 32])))
757                .expect("invalid s");
758        let max_priority_fee_per_gas: Option<u128> =
759            tx_max_priority_fee_per_gas_opt.and_then(|value| {
760                value
761                    .as_i128()
762                    .try_into()
763                    .ok()
764                    .or_else(|| ic.report("max_priority_fee_per_gas is invalid", None))
765            });
766        let max_fee_per_gas: Option<u128> = tx_max_fee_per_gas_opt.and_then(|value| {
767            value
768                .as_i128()
769                .try_into()
770                .ok()
771                .or_else(|| ic.report("max_fee_per_gas is invalid", None))
772        });
773        let cumulative_gas_used = u64::try_from(
774            tx_cumulative_gas_used_opt
775                .unwrap_or_else(|| ic.report("cumulative_gas_used is None", i256::ZERO))
776                .as_i128(),
777        )
778        .unwrap();
779        let contract_address: Option<Address> = tx_contract_address_opt.and_then(|a| {
780            a.try_into()
781                .ok()
782                .or_else(|| ic.report("contract_address is invalid", None))
783        });
784        let logs_bloom =
785            tx_logs_bloom_opt.unwrap_or_else(|| ic.report("logs_bloom is None", &[0; 256]));
786        let status = tx_status_opt.unwrap_or_else(|| ic.report("status is None", 0));
787        let expected_sighash = tx_sighash_opt;
788        let access_list: Option<AccessList> = tx_access_list_opt.map(|array| {
789            let access_list_items = array.as_struct_opt().expect("access list is not a struct");
790            convert_arrow_array_into_access_list(access_list_items)
791                .expect("access list is invalid")
792        });
793        let max_fee_per_blob_gas: Option<u128> = tx_max_fee_per_blob_gas_opt.and_then(|value| {
794            value
795                .as_i128()
796                .try_into()
797                .ok()
798                .or_else(|| ic.report("max_fee_per_blob_gas is invalid", None))
799        });
800        let blob_versioned_hashes: Option<Vec<FixedBytes<32>>> =
801            tx_blob_versioned_hashes_opt.map(|array| {
802                let binary_array = array
803                    .as_any()
804                    .downcast_ref::<BinaryArray>()
805                    .expect("blob_versioned_hashes must be a BinaryArray");
806                convert_binary_array_32_to_fixed_hashes(binary_array)
807            });
808        let tx_type = tx_type_opt.unwrap_or(if access_list.is_some() {
809            if max_priority_fee_per_gas.is_some() {
810                if max_fee_per_blob_gas.is_some() {
811                    3
812                } else {
813                    2
814                }
815            } else {
816                1
817            }
818        } else {
819            0
820        });
821
822        // if the block num has changed, store the previous tx receipts and tx envelopes in the mapping, clear the receipts vec and update the current block num
823        if block_num != current_block_num {
824            if !block_tx_receipts.is_empty() {
825                let receipt_root = calculate_receipt_root(&block_tx_receipts);
826                let transactions_root = calculate_transaction_root(&block_tx_envelopes);
827                receipts_root_by_block_num_mapping.insert(current_block_num, receipt_root);
828                transactions_root_by_block_num_mapping.insert(current_block_num, transactions_root);
829                block_tx_receipts.clear();
830                block_tx_envelopes.clear();
831            }
832            current_block_num = block_num;
833        }
834
835        // validate sighash
836        match expected_sighash {
837            Some(expected_sighash) => {
838                let sighash: [u8; 4] = input[..4].try_into().unwrap_or_else(|_| {
839                    ic.report(
840                        "input must be at least 4 bytes long for a tx with a sighash",
841                        [0; 4],
842                    )
843                });
844                if sighash != expected_sighash {
845                    ic.report(
846                        format!(
847                            "sighash mismatch. Expected:\n{:?},\nFound:\n{:?}",
848                            expected_sighash, sighash
849                        )
850                        .as_str(),
851                        (),
852                    );
853                }
854            }
855            None => {
856                if input.len() > 4 {
857                    ic.report("sighash is None, with a non-zero input", ());
858                }
859            }
860        }
861
862        // create alloy's tx_kind object
863        let tx_kind = match contract_address {
864            None => TxKind::Call(to.unwrap_or_else(|| {
865                ic.report(
866                    "to is None, while contract_address is also None",
867                    Address::ZERO,
868                )
869            })),
870            Some(_) => TxKind::Create,
871        };
872        let primitive_sig = PrimitiveSignature::new(r, s, v);
873
874        // create alloy's tx_envelope object (to accept all tx types)
875        let tx_envelope = match tx_type {
876            0 => {
877                let tx = TxLegacy {
878                    chain_id,
879                    nonce,
880                    gas_price,
881                    gas_limit,
882                    to: tx_kind,
883                    value,
884                    input,
885                };
886                let signed_tx = tx.into_signed(primitive_sig);
887                TxEnvelope::Legacy(signed_tx)
888            }
889            1 => {
890                let tx = TxEip2930 {
891                    chain_id: chain_id.unwrap_or_else(|| {
892                        ic.report("chain_id is None, for a Eip2930 transaction", 0)
893                    }),
894                    nonce,
895                    gas_price,
896                    gas_limit,
897                    to: tx_kind,
898                    value,
899                    access_list: access_list.unwrap_or_else(|| {
900                        ic.report(
901                            "access list is None, for a Eip2930 transaction",
902                            AccessList::default(),
903                        )
904                    }),
905                    input,
906                };
907                let signed_tx = tx.into_signed(primitive_sig);
908                TxEnvelope::Eip2930(signed_tx)
909            }
910            2 => {
911                let tx = TxEip1559 {
912                    chain_id: chain_id.unwrap_or_else(|| {
913                        ic.report("chain_id is None, for a Eip1559 transaction", 0)
914                    }),
915                    nonce,
916                    gas_limit,
917                    max_fee_per_gas: max_fee_per_gas.unwrap_or_else(|| {
918                        ic.report("max fee per gas is None, for a Eip1559 transaction", 0)
919                    }),
920                    max_priority_fee_per_gas: max_priority_fee_per_gas.unwrap_or_else(|| {
921                        ic.report(
922                            "max priority fee per gas is None, for a Eip1559 transaction",
923                            0,
924                        )
925                    }),
926                    to: tx_kind,
927                    value,
928                    access_list: access_list.unwrap_or_else(|| {
929                        ic.report(
930                            "access list is None, for a Eip1559 transaction",
931                            AccessList::default(),
932                        )
933                    }),
934                    input,
935                };
936                let signed_tx = tx.into_signed(primitive_sig);
937                TxEnvelope::Eip1559(signed_tx)
938            }
939            3 => {
940                let tx = TxEip4844Variant::TxEip4844(TxEip4844 {
941                    chain_id: chain_id.unwrap_or_else(|| {
942                        ic.report("chain_id is None, for a Eip4844 transaction", 0)
943                    }),
944                    nonce,
945                    gas_limit,
946                    max_fee_per_gas: max_fee_per_gas.unwrap_or_else(|| {
947                        ic.report("max fee per gas is None, for a Eip4844 transaction", 0)
948                    }),
949                    max_priority_fee_per_gas: max_priority_fee_per_gas.unwrap_or_else(|| {
950                        ic.report(
951                            "max priority fee per gas is None, for a Eip4844 transaction",
952                            0,
953                        )
954                    }),
955                    to: to.unwrap_or_else(|| {
956                        ic.report("to is None, for a Eip4844 transaction", Address::ZERO)
957                    }),
958                    value,
959                    access_list: access_list.unwrap_or_else(|| {
960                        ic.report(
961                            "access list is None, for a Eip4844 transaction",
962                            AccessList::default(),
963                        )
964                    }),
965                    blob_versioned_hashes: blob_versioned_hashes.unwrap_or_else(|| {
966                        ic.report(
967                            "blob versioned hashes is None, for a Eip4844 transaction",
968                            Vec::<FixedBytes<32>>::new(),
969                        )
970                    }),
971                    max_fee_per_blob_gas: max_fee_per_blob_gas.unwrap_or_else(|| {
972                        ic.report("max fee per blob gas is None, for a Eip4844 transaction", 0)
973                    }),
974                    input,
975                });
976                let signed_tx = tx.into_signed(primitive_sig);
977                TxEnvelope::Eip4844(signed_tx)
978            }
979            // 4 => TypedTransaction::Eip7702(TxEip7702{
980            //     chain_id,
981            //     nonce,
982            //     gas_limit,
983            //     max_fee_per_gas,
984            //     max_priority_fee_per_gas,
985            //     to,
986            //     value,
987            //     access_list,
988            //     authorization_list,
989            //     input,
990            // }),
991            _ => return Err(anyhow!("Invalid tx type: {}", tx_type)),
992        };
993
994        //validate tx hash
995        let calculated_tx_hash = tx_envelope.tx_hash();
996        if calculated_tx_hash != &expected_hash {
997            ic.report(
998                format!(
999                    "Calculated tx hash mismatch. Expected: {:?}, Found: {:?}",
1000                    expected_hash, calculated_tx_hash
1001                )
1002                .as_str(),
1003                (),
1004            );
1005        }
1006        block_tx_envelopes.push(tx_envelope);
1007
1008        // get the logs for the tx, if the tx failed or doesn't have logs, use an empty vec
1009        let (eip658value, tx_logs) = match status {
1010            0 => (Eip658Value::Eip658(false), &Vec::<Log>::new()),
1011            1 => (
1012                Eip658Value::Eip658(true),
1013                logs_by_block_num_and_tx_idx
1014                    .get(&(block_num, tx_idx))
1015                    .unwrap_or(&empty_logs),
1016            ),
1017            _ => return Err(anyhow!("Invalid tx status: {}", status)), // Other chains may have different status values
1018        };
1019
1020        // create a receipt object
1021        let receipt = Receipt {
1022            status: eip658value,
1023            cumulative_gas_used,
1024            logs: tx_logs.to_vec(),
1025        };
1026
1027        // calculate the receipt bloom with the receipt object
1028        let receiptwithbloom = receipt.with_bloom();
1029        // create an expected bloom object from the logs_bloom column value
1030        let expected_bloom = Bloom::new(
1031            logs_bloom
1032                .try_into()
1033                .unwrap_or_else(|_| ic.report("logs bloom must be 256 bytes", [0; 256])),
1034        );
1035        // validate logs bloom
1036        if receiptwithbloom.logs_bloom != expected_bloom {
1037            ic.report(
1038                format!(
1039                    "Calculated logs bloom mismatch.\nExpected {:?},\nFound: {:?}",
1040                    expected_bloom, receiptwithbloom.logs_bloom
1041                )
1042                .as_str(),
1043                (),
1044            );
1045        }
1046        // create a receipt envelope object from the receipt_with_bloom object
1047        let receipt_envelope = match tx_type {
1048            0 => ReceiptEnvelope::Legacy(receiptwithbloom),
1049            1 => ReceiptEnvelope::Eip2930(receiptwithbloom),
1050            2 => ReceiptEnvelope::Eip1559(receiptwithbloom),
1051            3 => ReceiptEnvelope::Eip4844(receiptwithbloom),
1052            4 => ReceiptEnvelope::Eip7702(receiptwithbloom),
1053            _ => return Err(anyhow!("Invalid tx type: {}", tx_type)),
1054        };
1055        // add the receipt envelope to the block tx receipts vec
1056        block_tx_receipts.push(receipt_envelope);
1057    }
1058
1059    // calculate the transactions root for the last block
1060    let transactions_root = calculate_transaction_root(&block_tx_envelopes);
1061    // calculate the receipt root for the last block, and store it in the mapping
1062    let receipt_root = calculate_receipt_root(&block_tx_receipts);
1063    transactions_root_by_block_num_mapping.insert(current_block_num, transactions_root);
1064    receipts_root_by_block_num_mapping.insert(current_block_num, receipt_root);
1065    ic.set_context(DataContext::default());
1066
1067    // COMPARE TRANSACTION AND RECEIPTS ROOT WITH EXPECTED VALUES
1068
1069    // extract the block numbers, receipts roots and transactions roots from the blocks table
1070    let block_array = extract_block_cols_as_arrays(blocks)?;
1071
1072    // create a map of block numbers to receipts roots
1073    let mut expected_transactions_and_receipts_root_by_block_num_mapping =
1074        BTreeMap::<u64, (FixedBytes<32>, FixedBytes<32>)>::new();
1075
1076    // iterate over the block numbers and receipts roots
1077    for ((block_num_opt, block_receipts_root_opt), block_transactions_root_opt) in block_array
1078        .number
1079        .iter()
1080        .zip(block_array.receipts_root.iter())
1081        .zip(block_array.transactions_root.iter())
1082    {
1083        // cast the values to the expected types
1084        let block_num = block_num_opt.unwrap();
1085        ic.set_context(DataContext::new(
1086            "Blocks".to_string(),
1087            format!("Block_num {}", block_num),
1088        ));
1089        let receipts_root = block_receipts_root_opt
1090            .unwrap_or_else(|| ic.report("receipts root is None", &[0; 32]))
1091            .try_into()
1092            .unwrap_or_else(|_| ic.report("receipts root is invalid", FixedBytes::ZERO));
1093        let transactions_root = block_transactions_root_opt
1094            .unwrap_or_else(|| ic.report("transactions root is None", &[0; 32]))
1095            .try_into()
1096            .unwrap_or_else(|_| ic.report("transactions root is invalid", FixedBytes::ZERO));
1097        // insert the values into the maps
1098        expected_transactions_and_receipts_root_by_block_num_mapping
1099            .insert(block_num, (receipts_root, transactions_root));
1100    }
1101
1102    for (block_num, (expected_receipts_root, expected_transactions_root)) in
1103        expected_transactions_and_receipts_root_by_block_num_mapping.iter()
1104    {
1105        // null root is the root of an empty block
1106        let null_root = <FixedBytes<32> as alloy_primitives::hex::FromHex>::from_hex(
1107            "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
1108        )
1109        .unwrap();
1110        if expected_receipts_root == expected_transactions_root
1111            && expected_receipts_root == &null_root
1112        {
1113            continue;
1114        }
1115        let calculated_receipts_root = receipts_root_by_block_num_mapping
1116            .get(block_num)
1117            .unwrap_or_else(|| {
1118                ic.report(
1119                    "There is no calculated receipts root for this block",
1120                    &FixedBytes::ZERO,
1121                )
1122            });
1123        let calculated_transactions_root = transactions_root_by_block_num_mapping
1124            .get(block_num)
1125            .unwrap_or_else(|| {
1126                ic.report(
1127                    "There is no calculated transactions root for this block",
1128                    &FixedBytes::ZERO,
1129                )
1130            });
1131        if expected_receipts_root != calculated_receipts_root {
1132            ic.report(
1133                format!(
1134                    "Receipts root mismatch. Expected: {:?}, Found: {:?}",
1135                    expected_receipts_root, calculated_receipts_root
1136                )
1137                .as_str(),
1138                (),
1139            );
1140        };
1141        if expected_transactions_root != calculated_transactions_root {
1142            ic.report(
1143                format!(
1144                    "Transactions root mismatch. Expected: {:?}, Found: {:?}",
1145                    expected_transactions_root, calculated_transactions_root
1146                )
1147                .as_str(),
1148                (),
1149            );
1150        }
1151    }
1152
1153    Ok(())
1154}
1155
1156fn extract_log_cols_as_arrays(logs: &RecordBatch) -> Result<LogArray> {
1157    let log_block_nums = logs
1158        .column_by_name("block_number")
1159        .context("get log block num col")?
1160        .as_any()
1161        .downcast_ref::<UInt64Array>()
1162        .context("get log block num col as u64")?;
1163
1164    let log_log_idx = logs
1165        .column_by_name("log_index")
1166        .context("get log log_index column")?
1167        .as_any()
1168        .downcast_ref::<UInt64Array>()
1169        .context("get log log_index col as u64")?;
1170
1171    let log_tx_idx = logs
1172        .column_by_name("transaction_index")
1173        .context("get tx index column")?
1174        .as_any()
1175        .downcast_ref::<UInt64Array>()
1176        .context("get tx index col as u64")?;
1177
1178    let log_address = logs
1179        .column_by_name("address")
1180        .context("get address column")?
1181        .as_any()
1182        .downcast_ref::<BinaryArray>()
1183        .context("get address as binary")?;
1184
1185    let log_topic0 = logs
1186        .column_by_name("topic0")
1187        .context("get topic0 column")?
1188        .as_any()
1189        .downcast_ref::<BinaryArray>()
1190        .context("get topic0 as binary")?;
1191
1192    let log_topic1 = logs
1193        .column_by_name("topic1")
1194        .context("get topic1 column")?
1195        .as_any()
1196        .downcast_ref::<BinaryArray>()
1197        .context("get topic1 as binary")?;
1198
1199    let log_topic2 = logs
1200        .column_by_name("topic2")
1201        .context("get topic2 column")?
1202        .as_any()
1203        .downcast_ref::<BinaryArray>()
1204        .context("get topic2 as binary")?;
1205
1206    let log_topic3 = logs
1207        .column_by_name("topic3")
1208        .context("get topic3 column")?
1209        .as_any()
1210        .downcast_ref::<BinaryArray>()
1211        .context("get topic3 as binary")?;
1212
1213    let log_data = logs
1214        .column_by_name("data")
1215        .context("get data column")?
1216        .as_any()
1217        .downcast_ref::<BinaryArray>()
1218        .context("get data as binary")?;
1219
1220    let log_array = LogArray {
1221        block_number: log_block_nums,
1222        tx_index: log_tx_idx,
1223        log_index: log_log_idx,
1224        address: log_address,
1225        topic0: log_topic0,
1226        topic1: log_topic1,
1227        topic2: log_topic2,
1228        topic3: log_topic3,
1229        data: log_data,
1230    };
1231
1232    // Return the extracted data
1233    Ok(log_array)
1234}
1235
1236fn extract_transaction_cols_as_arrays(transactions: &RecordBatch) -> Result<TransactionsArray> {
1237    let tx_block_nums = transactions
1238        .column_by_name("block_number")
1239        .context("get tx block num col")?
1240        .as_any()
1241        .downcast_ref::<UInt64Array>()
1242        .context("get tx block num col as u64")?;
1243
1244    let tx_gas_limit = transactions
1245        .column_by_name("gas")
1246        .context("get tx gas limit column")?
1247        .as_any()
1248        .downcast_ref::<Decimal256Array>()
1249        .context("get tx gas limit col as decimal256")?;
1250
1251    let tx_gas_price = transactions
1252        .column_by_name("gas_price")
1253        .context("get tx gas price column")?
1254        .as_any()
1255        .downcast_ref::<Decimal256Array>()
1256        .context("get tx gas price col as decimal256")?;
1257
1258    let tx_hash = transactions
1259        .column_by_name("hash")
1260        .context("get tx hash column")?
1261        .as_any()
1262        .downcast_ref::<BinaryArray>()
1263        .context("get tx hash col as binary")?;
1264
1265    let tx_input = transactions
1266        .column_by_name("input")
1267        .context("get tx input column")?
1268        .as_any()
1269        .downcast_ref::<BinaryArray>()
1270        .context("get tx input col as binary")?;
1271
1272    let tx_nonce = transactions
1273        .column_by_name("nonce")
1274        .context("get tx nonce column")?
1275        .as_any()
1276        .downcast_ref::<Decimal256Array>()
1277        .context("get tx nonce col as binary")?;
1278
1279    let tx_to = transactions
1280        .column_by_name("to")
1281        .context("get tx to column")?
1282        .as_any()
1283        .downcast_ref::<BinaryArray>()
1284        .context("get tx to col as binary")?;
1285
1286    let tx_tx_idx = transactions
1287        .column_by_name("transaction_index")
1288        .context("get tx index column")?
1289        .as_any()
1290        .downcast_ref::<UInt64Array>()
1291        .context("get tx index col as u64")?;
1292
1293    let tx_value = transactions
1294        .column_by_name("value")
1295        .context("get tx value column")?
1296        .as_any()
1297        .downcast_ref::<Decimal256Array>()
1298        .context("get tx value col as decimal256")?;
1299
1300    let tx_v = transactions
1301        .column_by_name("v")
1302        .context("get tx v column")?
1303        .as_any()
1304        .downcast_ref::<UInt8Array>()
1305        .context("get tx v col as u8")?;
1306
1307    let tx_r = transactions
1308        .column_by_name("r")
1309        .context("get tx r column")?
1310        .as_any()
1311        .downcast_ref::<BinaryArray>()
1312        .context("get tx r col as binary")?;
1313
1314    let tx_s = transactions
1315        .column_by_name("s")
1316        .context("get tx s column")?
1317        .as_any()
1318        .downcast_ref::<BinaryArray>()
1319        .context("get tx s col as binary")?;
1320
1321    let tx_max_priority_fee_per_gas = transactions
1322        .column_by_name("max_priority_fee_per_gas")
1323        .context("get tx max priority fee per gas column")?
1324        .as_any()
1325        .downcast_ref::<Decimal256Array>()
1326        .context("get tx max priority fee per gas col as decimal256")?;
1327
1328    let tx_max_fee_per_gas = transactions
1329        .column_by_name("max_fee_per_gas")
1330        .context("get tx max fee per gas column")?
1331        .as_any()
1332        .downcast_ref::<Decimal256Array>()
1333        .context("get tx max fee per gas col as decimal256")?;
1334
1335    let tx_chain_id = transactions
1336        .column_by_name("chain_id")
1337        .context("get tx chain id column")?
1338        .as_any()
1339        .downcast_ref::<Decimal256Array>()
1340        .context("get tx chain id col as decimal256")?;
1341
1342    let tx_cumulative_gas_used = transactions
1343        .column_by_name("cumulative_gas_used")
1344        .context("get tx cumulative gas used column")?
1345        .as_any()
1346        .downcast_ref::<Decimal256Array>()
1347        .context("get tx cumulative gas used col as decimal256")?;
1348
1349    let tx_contract_address = transactions
1350        .column_by_name("contract_address")
1351        .context("get tx contract address column")?
1352        .as_any()
1353        .downcast_ref::<BinaryArray>()
1354        .context("get tx contract address col as binary")?;
1355
1356    let tx_logs_bloom: &GenericByteArray<GenericBinaryType<i32>> = transactions
1357        .column_by_name("logs_bloom")
1358        .context("get tx logs bloom column")?
1359        .as_any()
1360        .downcast_ref::<BinaryArray>()
1361        .context("get tx logs bloom col as binary")?;
1362
1363    let tx_type = transactions
1364        .column_by_name("type")
1365        .context("get tx type column")?
1366        .as_any()
1367        .downcast_ref::<UInt8Array>()
1368        .context("get tx type col as u8")?;
1369
1370    let tx_status = transactions
1371        .column_by_name("status")
1372        .context("get tx status column")?
1373        .as_any()
1374        .downcast_ref::<UInt8Array>()
1375        .context("get tx status col as u8")?;
1376
1377    let tx_sighash = transactions
1378        .column_by_name("sighash")
1379        .context("get tx sig hash column")?
1380        .as_any()
1381        .downcast_ref::<BinaryArray>()
1382        .context("get tx sig hash col as binary")?;
1383
1384    let tx_access_list = transactions
1385        .column_by_name("access_list")
1386        .context("get tx access list column")?
1387        .as_any()
1388        .downcast_ref::<GenericListArray<i32>>()
1389        .context("get tx access list col as binary")?;
1390
1391    let tx_max_fee_per_blob_gas = transactions
1392        .column_by_name("max_fee_per_blob_gas")
1393        .context("get tx max fee per blob gas column")?
1394        .as_any()
1395        .downcast_ref::<Decimal256Array>()
1396        .context("get tx max fee per blob gas col as decimal256")?;
1397
1398    let tx_blob_versioned_hashes = transactions
1399        .column_by_name("blob_versioned_hashes")
1400        .context("get tx blob versioned hashes column")?
1401        .as_any()
1402        .downcast_ref::<GenericListArray<i32>>()
1403        .context("get tx blob versioned hashes col as binary")?;
1404
1405    let tx_array = TransactionsArray {
1406        block_number: tx_block_nums,
1407        gas_limit: tx_gas_limit,
1408        gas_price: tx_gas_price,
1409        hash: tx_hash,
1410        input: tx_input,
1411        nonce: tx_nonce,
1412        to: tx_to,
1413        tx_index: tx_tx_idx,
1414        value: tx_value,
1415        v: tx_v,
1416        r: tx_r,
1417        s: tx_s,
1418        max_priority_fee_per_gas: tx_max_priority_fee_per_gas,
1419        max_fee_per_gas: tx_max_fee_per_gas,
1420        chain_id: tx_chain_id,
1421        cumulative_gas_used: tx_cumulative_gas_used,
1422        contract_address: tx_contract_address,
1423        logs_bloom: tx_logs_bloom,
1424        tx_type,
1425        status: tx_status,
1426        sighash: tx_sighash,
1427        access_list: tx_access_list,
1428        max_fee_per_blob_gas: tx_max_fee_per_blob_gas,
1429        blob_versioned_hashes: tx_blob_versioned_hashes,
1430    };
1431
1432    Ok(tx_array)
1433}
1434
1435fn extract_block_cols_as_arrays(blocks: &RecordBatch) -> Result<BlockArray> {
1436    let block_numbers = blocks
1437        .column_by_name("number")
1438        .context("get block number column")?
1439        .as_any()
1440        .downcast_ref::<UInt64Array>()
1441        .context("get block number column as u64")?;
1442
1443    let block_receipts_root = blocks
1444        .column_by_name("receipts_root")
1445        .context("get block receipts_root column")?
1446        .as_any()
1447        .downcast_ref::<BinaryArray>()
1448        .context("get block receipts_root as binary")?;
1449
1450    let block_transactions_root = blocks
1451        .column_by_name("transactions_root")
1452        .context("get block transactions_root column")?
1453        .as_any()
1454        .downcast_ref::<BinaryArray>()
1455        .context("get block transactions_root as binary")?;
1456
1457    let block_array = BlockArray {
1458        number: block_numbers,
1459        receipts_root: block_receipts_root,
1460        transactions_root: block_transactions_root,
1461    };
1462
1463    Ok(block_array)
1464}
1465
1466fn convert_arrow_array_into_access_list(array: &StructArray) -> Result<AccessList> {
1467    let mut items = Vec::with_capacity(array.len());
1468
1469    // Extract the child arrays
1470    let address_array = array
1471        .column_by_name("address")
1472        .context("Missing 'address' field")?
1473        .as_any()
1474        .downcast_ref::<BinaryArray>()
1475        .context("'address' field is not a BinaryArray")?;
1476
1477    let storage_keys_array = array
1478        .column_by_name("storage_keys")
1479        .context("Missing 'storage_keys' field")?
1480        .as_any()
1481        .downcast_ref::<ListArray>()
1482        .context("'storage_keys' field is not a ListArray")?;
1483
1484    let storage_keys_values = storage_keys_array
1485        .values()
1486        .as_any()
1487        .downcast_ref::<BinaryArray>()
1488        .context("Storage keys values are not a BinaryArray")?;
1489
1490    let storage_keys_offsets = storage_keys_array.offsets();
1491
1492    // Convert each row to an AccessListItem
1493    for i in 0..array.len() {
1494        if array.is_null(i) {
1495            continue;
1496        }
1497
1498        // Skip if either is null - they must be both valid or both null
1499        if address_array.is_null(i) || storage_keys_array.is_null(i) {
1500            continue;
1501        }
1502
1503        // Get address - convert binary to Address (20 bytes)
1504        let bytes = address_array.value(i);
1505        if bytes.len() != 20 {
1506            return Err(anyhow::anyhow!("Invalid address length: {}", bytes.len()));
1507        }
1508        let mut addr = [0u8; 20];
1509        addr.copy_from_slice(bytes);
1510        let address = Address::new(addr);
1511
1512        // Get storage keys - convert each binary to B256 (32 bytes)
1513        let mut storage_keys = Vec::new();
1514        let start_offset = *storage_keys_offsets.get(i).expect("start offset is null") as usize;
1515        let end_offset = *storage_keys_offsets.get(i + 1).expect("end offset is null") as usize;
1516
1517        for j in start_offset..end_offset {
1518            let bytes = storage_keys_values.value(j);
1519
1520            // Make sure we have the correct length for B256
1521            if bytes.len() != 32 {
1522                return Err(anyhow::anyhow!("Invalid B256 length: {}", bytes.len()));
1523            }
1524
1525            let mut b256 = [0u8; 32];
1526            b256.copy_from_slice(bytes);
1527            storage_keys.push(B256::new(b256));
1528        }
1529
1530        items.push(AccessListItem {
1531            address,
1532            storage_keys,
1533        });
1534    }
1535
1536    Ok(AccessList(items))
1537}
1538
1539fn convert_binary_array_32_to_fixed_hashes(binary_array: &BinaryArray) -> Vec<FixedBytes<32>> {
1540    binary_array
1541        .iter()
1542        .map(|bytes| {
1543            let bytes = bytes.expect("blob versioned hash cannot be null");
1544            let mut hash = [0u8; 32];
1545            hash.copy_from_slice(bytes);
1546            FixedBytes::<32>::new(hash)
1547        })
1548        .collect()
1549}