1mod issues_collector;
2
3pub use issues_collector::{DataContext, IssueCollector, IssueCollectorConfig, ReportFormat};
4
5use std::collections::BTreeMap;
6
7use anyhow::{anyhow, Context, Result};
8use arrow::array::{
9 Array, AsArray, BinaryArray, Decimal256Array, GenericByteArray, GenericListArray, ListArray,
10 PrimitiveArray, StructArray, UInt64Array, UInt8Array,
11};
12use arrow::datatypes::{i256, Decimal256Type, GenericBinaryType, UInt8Type};
13use arrow::{datatypes::UInt64Type, record_batch::RecordBatch};
14
15use alloy_consensus::proofs::{calculate_receipt_root, calculate_transaction_root};
16use alloy_consensus::{
17 Eip658Value, Receipt, ReceiptEnvelope, SignableTransaction, TxEip1559, TxEip2930, TxEip4844,
18 TxEip4844Variant, TxEnvelope, TxLegacy,
19};
20use alloy_eips::eip2930::{AccessList, AccessListItem};
21use alloy_primitives::{
22 Address, Bloom, Bytes, FixedBytes, Log, PrimitiveSignature, TxKind, Uint, B256, U256,
23};
24
25struct LogArray<'a> {
26 block_number: &'a PrimitiveArray<UInt64Type>,
27 tx_index: &'a PrimitiveArray<UInt64Type>,
28 log_index: &'a PrimitiveArray<UInt64Type>,
29 address: &'a GenericByteArray<GenericBinaryType<i32>>,
30 topic0: &'a GenericByteArray<GenericBinaryType<i32>>,
31 topic1: &'a GenericByteArray<GenericBinaryType<i32>>,
32 topic2: &'a GenericByteArray<GenericBinaryType<i32>>,
33 topic3: &'a GenericByteArray<GenericBinaryType<i32>>,
34 data: &'a GenericByteArray<GenericBinaryType<i32>>,
35}
36
37struct TransactionsArray<'a> {
38 block_number: &'a PrimitiveArray<UInt64Type>,
39 gas_limit: &'a PrimitiveArray<Decimal256Type>,
40 gas_price: &'a PrimitiveArray<Decimal256Type>,
41 hash: &'a GenericByteArray<GenericBinaryType<i32>>,
42 input: &'a GenericByteArray<GenericBinaryType<i32>>,
43 nonce: &'a PrimitiveArray<Decimal256Type>,
44 to: &'a GenericByteArray<GenericBinaryType<i32>>,
45 tx_index: &'a PrimitiveArray<UInt64Type>,
46 value: &'a PrimitiveArray<Decimal256Type>,
47 v: &'a PrimitiveArray<UInt8Type>,
48 r: &'a GenericByteArray<GenericBinaryType<i32>>,
49 s: &'a GenericByteArray<GenericBinaryType<i32>>,
50 max_priority_fee_per_gas: &'a PrimitiveArray<Decimal256Type>,
51 max_fee_per_gas: &'a PrimitiveArray<Decimal256Type>,
52 chain_id: &'a PrimitiveArray<Decimal256Type>,
53 cumulative_gas_used: &'a PrimitiveArray<Decimal256Type>,
54 contract_address: &'a GenericByteArray<GenericBinaryType<i32>>,
55 logs_bloom: &'a GenericByteArray<GenericBinaryType<i32>>,
56 tx_type: &'a PrimitiveArray<UInt8Type>,
57 status: &'a PrimitiveArray<UInt8Type>,
58 sighash: &'a GenericByteArray<GenericBinaryType<i32>>,
59 access_list: &'a GenericListArray<i32>,
60 max_fee_per_blob_gas: &'a PrimitiveArray<Decimal256Type>,
61 blob_versioned_hashes: &'a GenericListArray<i32>,
62}
63
64struct BlockArray<'a> {
65 number: &'a PrimitiveArray<UInt64Type>,
66 receipts_root: &'a GenericByteArray<GenericBinaryType<i32>>,
67 transactions_root: &'a GenericByteArray<GenericBinaryType<i32>>,
68}
69
70pub fn validate_block_data(
81 blocks: &RecordBatch,
82 transactions: &RecordBatch,
83 logs: &RecordBatch,
84 traces: &RecordBatch,
85) -> Result<()> {
86 let block_numbers = blocks
87 .column_by_name("number")
88 .context("get block number column")?
89 .as_any()
90 .downcast_ref::<UInt64Array>()
91 .context("get block number column as u64")?;
92
93 if block_numbers.null_count() > 0 {
94 return Err(anyhow!("block.number column can't have nulls"));
95 }
96
97 let first_block_num = block_numbers
98 .iter()
99 .next()
100 .map(Option::unwrap)
101 .unwrap_or_default();
102 let mut current_bn = first_block_num;
103 for bn in block_numbers.iter().skip(1) {
104 let bn = bn.unwrap();
105 if current_bn + 1 != bn {
106 return Err(anyhow!(
107 "block.number column is not consistent. {} != {}",
108 current_bn + 1,
109 bn
110 ));
111 }
112 current_bn = bn;
113 }
114
115 let block_hashes = blocks
116 .column_by_name("hash")
117 .context("get block hash column")?
118 .as_any()
119 .downcast_ref::<BinaryArray>()
120 .context("get block hash as binary array")?;
121
122 let block_parent_hashes = blocks
123 .column_by_name("parent_hash")
124 .context("get block parent_hash column")?
125 .as_any()
126 .downcast_ref::<BinaryArray>()
127 .context("get block parent_hash as binary array")?;
128
129 let get_block_hash = |block_num: u64| -> Option<&[u8]> {
130 let pos = usize::try_from(block_num.checked_sub(first_block_num)?).unwrap();
131 if pos < block_hashes.len() {
132 Some(block_hashes.value(pos))
133 } else {
134 None
135 }
136 };
137
138 if block_hashes.null_count() > 0 {
139 return Err(anyhow!("block.hash column can't have nulls"));
140 }
141
142 if block_parent_hashes.null_count() > 0 {
143 return Err(anyhow!("block.parent_has column can't have nulls"));
144 }
145
146 for (expected_parent_hash, parent_hash) in
147 block_hashes.iter().zip(block_parent_hashes.iter().skip(1))
148 {
149 let expected_parent_hash = expected_parent_hash.unwrap();
150 let parent_hash = parent_hash.unwrap();
151 if expected_parent_hash != parent_hash {
152 return Err(anyhow!(
153 "bad parent hash found. expected {}, found {}",
154 faster_hex::hex_string(expected_parent_hash),
155 faster_hex::hex_string(parent_hash)
156 ));
157 }
158 }
159
160 validate_block_hashes(get_block_hash, transactions).context("validate tx block hashes")?;
161 validate_block_hashes(get_block_hash, logs).context("validate log block hashes")?;
162 validate_block_hashes(get_block_hash, traces).context("validate trace block hashes")?;
163
164 let mut tx_hash_mapping = vec![Vec::<[u8; 32]>::with_capacity(200); block_numbers.len()];
167
168 let tx_hashes = transactions
169 .column_by_name("hash")
170 .context("get tx hash col")?
171 .as_any()
172 .downcast_ref::<BinaryArray>()
173 .context("tx hash col as binary")?;
174 let tx_block_nums = transactions
175 .column_by_name("block_number")
176 .context("get tx block num col")?
177 .as_any()
178 .downcast_ref::<UInt64Array>()
179 .context("get tx block num col as u64")?;
180 let tx_indices = transactions
181 .column_by_name("transaction_index")
182 .context("get tx index column")?
183 .as_any()
184 .downcast_ref::<UInt64Array>()
185 .context("get tx index col as u64")?;
186
187 if tx_hashes.null_count() > 0 {
188 return Err(anyhow!("tx hash column can't have nulls"));
189 }
190 if tx_block_nums.null_count() > 0 {
191 return Err(anyhow!("tx block number column can't have nulls"));
192 }
193 if tx_indices.null_count() > 0 {
194 return Err(anyhow!("tx index column can't have nulls"));
195 }
196
197 let mut expected_tx_index = 0;
198 let mut current_block_num = first_block_num;
199
200 for ((tx_hash, tx_bn), tx_idx) in tx_hashes
201 .iter()
202 .zip(tx_block_nums.iter())
203 .zip(tx_indices.iter())
204 {
205 let tx_hash = tx_hash.unwrap();
206 let tx_bn = tx_bn.unwrap();
207 let tx_idx = tx_idx.unwrap();
208
209 if tx_bn != current_block_num {
210 if tx_bn < current_block_num {
211 return Err(anyhow!(
212 "found wrong ordering in tx block numbers after block num {}",
213 current_block_num
214 ));
215 }
216
217 current_block_num = tx_bn;
218 expected_tx_index = 0;
219 }
220
221 if tx_idx != expected_tx_index {
222 return Err(anyhow!(
223 "found unexpected tx index at the start of block {}",
224 current_block_num
225 ));
226 }
227 expected_tx_index += 1;
228
229 let block_pos = tx_bn
230 .checked_sub(first_block_num)
231 .with_context(|| format!("unexpected block num {} in transactions", tx_bn))?;
232 let mappings = tx_hash_mapping
233 .get_mut(usize::try_from(block_pos).unwrap())
234 .unwrap();
235
236 assert_eq!(mappings.len(), usize::try_from(tx_idx).unwrap());
237
238 if tx_hash.len() != 32 {
239 return Err(anyhow!("found bad tx hash at {},{}", tx_bn, tx_idx));
240 }
241
242 mappings.push(tx_hash.try_into().unwrap());
243 }
244
245 validate_transaction_hashes(first_block_num, &tx_hash_mapping, logs, "transaction_index")
246 .context("check tx hashes in logs")?;
247 validate_transaction_hashes(
248 first_block_num,
249 &tx_hash_mapping,
250 traces,
251 "transaction_position",
252 )
253 .context("check tx hashes in traces")?;
254
255 let log_block_nums = logs
258 .column_by_name("block_number")
259 .context("get log block num col")?
260 .as_any()
261 .downcast_ref::<UInt64Array>()
262 .context("get log block num col as u64")?;
263 let log_indices = logs
264 .column_by_name("log_index")
265 .context("get log index column")?
266 .as_any()
267 .downcast_ref::<UInt64Array>()
268 .context("get log index col as u64")?;
269
270 if log_block_nums.null_count() > 0 {
271 return Err(anyhow!("log block number column can't have nulls"));
272 }
273 if log_indices.null_count() > 0 {
274 return Err(anyhow!("log index column can't have nulls"));
275 }
276
277 let mut expected_log_index = 0;
278 let mut current_block_num = first_block_num;
279
280 for (log_idx, log_bn) in log_indices.iter().zip(log_block_nums.iter()) {
281 let log_idx = log_idx.unwrap();
282 let log_bn = log_bn.unwrap();
283
284 if log_bn != current_block_num {
285 if log_bn < current_block_num {
286 return Err(anyhow!(
287 "found wrong ordering in log block numbers after block num {}",
288 current_block_num
289 ));
290 }
291
292 expected_log_index = 0;
293 current_block_num = log_bn;
294 }
295
296 if log_idx != expected_log_index {
297 return Err(anyhow!(
298 "found unexpected log index, expected {},{} but got {} for index",
299 log_bn,
300 expected_log_index,
301 log_idx
302 ));
303 }
304 expected_log_index += 1;
305 }
306
307 let trace_block_nums = traces
310 .column_by_name("block_number")
311 .context("get trace block num col")?
312 .as_any()
313 .downcast_ref::<UInt64Array>()
314 .context("get trace block num col as u64")?;
315 let trace_tx_indices = traces
316 .column_by_name("transaction_position")
317 .context("get trace tx index column")?
318 .as_any()
319 .downcast_ref::<UInt64Array>()
320 .context("get trace tx index col as u64")?;
321
322 if trace_block_nums.null_count() > 0 {
323 return Err(anyhow!("log block number column can't have nulls"));
324 }
325
326 let mut current_tx_pos = 0;
327 let mut current_block_num = first_block_num;
328
329 for (trace_bn, trace_tx_pos) in trace_block_nums.iter().zip(trace_tx_indices.iter()) {
330 let prev_bn = current_block_num;
331
332 let trace_bn = trace_bn.unwrap();
333
334 if trace_bn != current_block_num {
335 if trace_bn < current_block_num {
336 return Err(anyhow!(
337 "found wrong ordering in trace block numbers after block num {}",
338 current_block_num
339 ));
340 }
341
342 current_tx_pos = 0;
343 current_block_num = trace_bn;
344 }
345
346 let tx_pos = match trace_tx_pos {
347 Some(x) => x,
348 None => continue,
350 };
351
352 if tx_pos < current_tx_pos {
353 return Err(anyhow!(
354 "found bad tx position ordering after {},{}",
355 prev_bn,
356 current_tx_pos
357 ));
358 }
359 current_tx_pos = tx_pos;
360 }
361
362 Ok(())
363}
364
365fn validate_block_hashes<'a, F: Fn(u64) -> Option<&'a [u8]>>(
366 get_block_hash: F,
367 data: &RecordBatch,
368) -> Result<()> {
369 let block_hashes = data
370 .column_by_name("block_hash")
371 .context("get block hash column")?
372 .as_any()
373 .downcast_ref::<BinaryArray>()
374 .context("block hash col as binary")?;
375 let block_numbers = data
376 .column_by_name("block_number")
377 .context("get block number column")?
378 .as_any()
379 .downcast_ref::<UInt64Array>()
380 .context("block number as u64")?;
381
382 if block_hashes.null_count() > 0 {
383 return Err(anyhow!("block hash column can't have nulls"));
384 }
385
386 if block_numbers.null_count() > 0 {
387 return Err(anyhow!("block number column can't have nulls"));
388 }
389
390 for (bn, hash) in block_numbers.iter().zip(block_hashes.iter()) {
391 let bn = bn.unwrap();
392 let hash = hash.unwrap();
393
394 let expected = match get_block_hash(bn) {
395 Some(h) => h,
396 None => {
397 return Err(anyhow!("couldn't find expected hash for block {}", bn));
398 }
399 };
400
401 if expected != hash {
402 return Err(anyhow!(
403 "block hash mismatch at block {}. expected {} got {}",
404 bn,
405 faster_hex::hex_string(expected),
406 faster_hex::hex_string(hash)
407 ));
408 }
409 }
410
411 Ok(())
412}
413
414fn validate_transaction_hashes(
415 first_block_num: u64,
416 expected_tx_hashes: &[Vec<[u8; 32]>],
417 data: &RecordBatch,
418 tx_index_col_name: &str,
419) -> Result<()> {
420 let tx_indices = data
421 .column_by_name(tx_index_col_name)
422 .context("get tx index column")?
423 .as_any()
424 .downcast_ref::<UInt64Array>()
425 .context("get tx index col as u64")?;
426 let block_numbers = data
427 .column_by_name("block_number")
428 .context("get block number column")?
429 .as_any()
430 .downcast_ref::<UInt64Array>()
431 .context("block number as u64")?;
432 let tx_hashes = data
433 .column_by_name("transaction_hash")
434 .context("get tx hash column")?
435 .as_any()
436 .downcast_ref::<BinaryArray>()
437 .context("get tx hash col as binary")?;
438
439 if block_numbers.null_count() > 0 {
440 return Err(anyhow!("block number column can't have nulls"));
441 }
442
443 for ((tx_idx, tx_hash), bn) in tx_indices
444 .iter()
445 .zip(tx_hashes.iter())
446 .zip(block_numbers.iter())
447 {
448 if let Some(tx_idx) = tx_idx {
450 let bn = bn.unwrap();
451 let tx_hash = match tx_hash {
452 Some(h) => h,
453 None => {
454 return Err(anyhow!("tx hash no found for tx {},{}", bn, tx_idx));
455 }
456 };
457
458 let block_i = match bn.checked_sub(first_block_num) {
459 Some(i) => usize::try_from(i).unwrap(),
460 None => return Err(anyhow!("bad block num: {}", bn)),
461 };
462
463 let expected_tx_hash = expected_tx_hashes
464 .get(block_i)
465 .with_context(|| format!("block {} not found in given data", bn))?
466 .get(usize::try_from(tx_idx).unwrap())
467 .with_context(|| format!("tx hash data for tx {},{} not found", bn, tx_idx))?;
468
469 if expected_tx_hash != tx_hash {
470 return Err(anyhow!(
471 "tx hash mismatch for tx {},{}. Expected {}, Found {}",
472 bn,
473 tx_idx,
474 faster_hex::hex_string(expected_tx_hash),
475 faster_hex::hex_string(tx_hash)
476 ));
477 }
478 }
479 }
480
481 Ok(())
482}
483
484#[rustfmt::skip]
485pub fn validate_root_hashes(
486 blocks: &RecordBatch,
487 logs: &RecordBatch,
488 transactions: &RecordBatch,
489 issues_collector: &mut IssueCollector,
490) -> Result<()> {
491 let ic = issues_collector;
492
493 let log_array = extract_log_cols_as_arrays(logs)?;
496
497 let mut current_block_num = log_array.block_number.value(0);
499 let mut current_tx_idx = log_array.log_index.value(0);
500 let mut tx_logs = Vec::<Log>::with_capacity(20);
502 let mut logs_by_block_num_and_tx_idx = BTreeMap::<(u64, u64), Vec<Log>>::new();
504
505 let log_iterators = log_array
506 .block_number
507 .iter()
508 .zip(log_array.log_index.iter())
509 .zip(log_array.tx_index.iter())
510 .zip(log_array.address.iter())
511 .zip(log_array.topic0.iter())
512 .zip(log_array.topic1.iter())
513 .zip(log_array.topic2.iter())
514 .zip(log_array.topic3.iter())
515 .zip(log_array.data.iter());
516
517 for (
519 (
520 (
521 (
522 ((((block_nums_opt, log_idx_opt), tx_idx_opt), address_opt), topic0_opt),
523 topic1_opt,
524 ),
525 topic2_opt,
526 ),
527 topic3_opt,
528 ),
529 data_opt,
530 ) in log_iterators
531 {
532 let block_num = block_nums_opt.unwrap(); let tx_idx = tx_idx_opt.unwrap(); let log_idx = log_idx_opt.unwrap_or_else(|| {
536 ic.report_with_context(
537 "log_idx is None",
538 DataContext::new(
539 "Logs".to_string(),
540 format!("Block_num {}, Tx_idx {}", block_num, tx_idx),
541 ),
542 99999,
543 )
544 });
545 ic.set_context(DataContext::new(
546 "Logs".to_string(),
547 format!(
548 "Block_num {}, Tx_idx {}, Log_idx {}",
549 block_num, tx_idx, log_idx
550 ),
551 ));
552 let address = address_opt
553 .unwrap_or_else(|| ic.report("address is None", &[0; 20]))
554 .try_into()
555 .unwrap_or_else(|_| ic.report("address is invalid", Address::ZERO));
556 let topic0: FixedBytes<32> = topic0_opt
557 .unwrap_or_else(|| ic.report("topic0 is None", &[0; 32]))
558 .try_into()
559 .unwrap_or_else(|_| ic.report("topic0 is invalid", FixedBytes::<32>::new([0; 32])));
560 let topic1: Option<FixedBytes<32>> = topic1_opt.and_then(|t| {
561 t.try_into()
562 .ok()
563 .or_else(|| ic.report("topic1 is invalid", None))
564 });
565 let topic2: Option<FixedBytes<32>> = topic2_opt.and_then(|t| {
566 t.try_into()
567 .ok()
568 .or_else(|| ic.report("topic2 is invalid", None))
569 });
570 let topic3: Option<FixedBytes<32>> = topic3_opt.and_then(|t| {
571 t.try_into()
572 .ok()
573 .or_else(|| ic.report("topic3 is invalid", None))
574 });
575 let log_data = data_opt.unwrap_or_else(|| ic.report("log_data is None", &[0; 0]));
576 let log_data = Bytes::copy_from_slice(log_data);
577
578 let topics: Vec<_> = [Some(topic0), topic1, topic2, topic3]
580 .into_iter()
581 .flatten()
582 .collect();
583
584 if block_num != current_block_num || tx_idx != current_tx_idx {
586 if !tx_logs.is_empty() {
587 logs_by_block_num_and_tx_idx
588 .insert((current_block_num, current_tx_idx), tx_logs.clone());
589 tx_logs.clear();
590 }
591 current_block_num = block_num;
592 current_tx_idx = tx_idx;
593 }
594
595 let log = Log::new(address, topics, log_data).expect("log is invalid");
597 tx_logs.push(log);
598 }
599 logs_by_block_num_and_tx_idx.insert((current_block_num, current_tx_idx), tx_logs);
601 ic.set_context(DataContext::default());
602
603 let tx_array = extract_transaction_cols_as_arrays(transactions)?;
605 let mut current_block_num = tx_array.block_number.value(0);
606 let mut transactions_root_by_block_num_mapping = BTreeMap::<u64, FixedBytes<32>>::new();
608 let mut receipts_root_by_block_num_mapping = BTreeMap::<u64, FixedBytes<32>>::new();
610 let mut block_tx_envelopes = Vec::<TxEnvelope>::with_capacity(200);
612 let mut block_tx_receipts = Vec::<ReceiptEnvelope>::with_capacity(200);
614 let empty_logs = Vec::<Log>::new();
616
617 let tx_iterators = tx_array
618 .block_number
619 .iter()
620 .zip(tx_array.gas_limit.iter())
621 .zip(tx_array.gas_price.iter())
622 .zip(tx_array.hash.iter())
623 .zip(tx_array.input.iter())
624 .zip(tx_array.nonce.iter())
625 .zip(tx_array.to.iter())
626 .zip(tx_array.tx_index.iter())
627 .zip(tx_array.value.iter())
628 .zip(tx_array.v.iter())
629 .zip(tx_array.r.iter())
630 .zip(tx_array.s.iter())
631 .zip(tx_array.max_priority_fee_per_gas.iter())
632 .zip(tx_array.max_fee_per_gas.iter())
633 .zip(tx_array.chain_id.iter())
634 .zip(tx_array.cumulative_gas_used.iter())
635 .zip(tx_array.contract_address.iter())
636 .zip(tx_array.logs_bloom.iter())
637 .zip(tx_array.tx_type.iter())
638 .zip(tx_array.status.iter())
639 .zip(tx_array.sighash.iter())
640 .zip(tx_array.access_list.iter())
641 .zip(tx_array.max_fee_per_blob_gas.iter())
642 .zip(tx_array.blob_versioned_hashes.iter());
643
644 for (((((((((((((((((((((((
646 tx_block_nums_opt
647 , tx_gas_limit_opt)
648 , tx_gas_price_opt)
649 , tx_hash_opt)
650 , tx_input_opt)
651 , tx_nonce_opt)
652 , tx_to_opt)
653 , tx_tx_idx_opt)
654 , tx_value_opt)
655 , tx_v_opt)
656 , tx_r_opt)
657 , tx_s_opt)
658 , tx_max_priority_fee_per_gas_opt)
659 , tx_max_fee_per_gas_opt)
660 , tx_chain_id_opt)
661 , tx_cumulative_gas_used_opt)
662 , tx_contract_address_opt)
663 , tx_logs_bloom_opt)
664 , tx_type_opt)
665 , tx_status_opt)
666 , tx_sighash_opt)
667 , tx_access_list_opt)
668 , tx_max_fee_per_blob_gas_opt)
669 , tx_blob_versioned_hashes_opt) in tx_iterators {
670 let cont_row_ctx = match (tx_block_nums_opt, tx_tx_idx_opt) {
672 (Some(block_num), Some(tx_idx)) => {
673 format!("Block_num {}, Tx_idx {}", block_num, tx_idx)
674 }
675 _ => "Undefined".to_string(),
676 };
677
678 let expected_hash: FixedBytes<32> = match tx_hash_opt {
680 None => {
682 ic.set_context(DataContext::new("Transactions".to_string(), cont_row_ctx));
683 ic.report("tx_hash is None", FixedBytes::<32>::new([0; 32]))
684 }
685 Some(hash) => match hash.try_into() {
686 Ok(hash) => {
688 ic.set_context(DataContext::new(
689 "Transactions".to_string(),
690 format!("Tx_hash {}", hash),
691 ));
692 hash
693 }
694 Err(_) => {
695 ic.set_context(DataContext::new("Transactions".to_string(), cont_row_ctx));
696 ic.report("tx_hash is invalid", FixedBytes::<32>::new([0; 32]))
697 }
698 },
699 };
700
701 let block_num = tx_block_nums_opt.unwrap(); let gas_limit = u64::try_from(
704 tx_gas_limit_opt
705 .unwrap_or_else(|| ic.report("gas_limit is None", i256::ZERO))
706 .as_i128(),
707 )
708 .unwrap();
709 let gas_price = u128::try_from(
710 tx_gas_price_opt
711 .unwrap_or_else(|| ic.report("gas_price is None", i256::ZERO))
712 .as_i128(),
713 )
714 .unwrap();
715 let input = tx_input_opt.unwrap_or_else(|| ic.report("input is None", &[0; 0]));
716 let input = Bytes::copy_from_slice(input);
717 let nonce = u64::try_from(
718 tx_nonce_opt
719 .unwrap_or_else(|| ic.report("nonce is None", i256::ZERO))
720 .as_i128(),
721 )
722 .unwrap();
723 let to: Option<Address> = tx_to_opt.and_then(|a| {
724 a.try_into()
725 .ok()
726 .or_else(|| ic.report("to is invalid", None))
727 });
728 let tx_idx = tx_tx_idx_opt.unwrap_or_else(|| ic.report("tx_idx is None", 0));
729 let value = U256::try_from(
730 tx_value_opt
731 .unwrap_or_else(|| ic.report("value is None", i256::ZERO))
732 .as_i128(),
733 )
734 .unwrap();
735 let chain_id = tx_chain_id_opt.and_then(|id| {
736 u64::try_from(id.as_i128())
737 .ok()
738 .or_else(|| ic.report("chain_id is invalid", None))
739 });
740 let r_id: u8 = (chain_id.unwrap_or(1) * 2 + 35)
742 .try_into()
743 .expect("invalid chain_id, produced signiture v is out of range");
744 let v = tx_v_opt.unwrap_or_else(|| ic.report("v is None", 0));
745 let v = if v == 0 || v == 27 || v == r_id {
746 false
747 } else if v == 1 || v == 28 || v == r_id + 1 {
748 true
749 } else {
750 return Err(anyhow!("invalid v"));
751 };
752 let r: Uint<256, 4> =
753 U256::try_from_be_slice(tx_r_opt.unwrap_or_else(|| ic.report("r is None", &[0; 32])))
754 .expect("invalid r");
755 let s: Uint<256, 4> =
756 U256::try_from_be_slice(tx_s_opt.unwrap_or_else(|| ic.report("s is None", &[0; 32])))
757 .expect("invalid s");
758 let max_priority_fee_per_gas: Option<u128> =
759 tx_max_priority_fee_per_gas_opt.and_then(|value| {
760 value
761 .as_i128()
762 .try_into()
763 .ok()
764 .or_else(|| ic.report("max_priority_fee_per_gas is invalid", None))
765 });
766 let max_fee_per_gas: Option<u128> = tx_max_fee_per_gas_opt.and_then(|value| {
767 value
768 .as_i128()
769 .try_into()
770 .ok()
771 .or_else(|| ic.report("max_fee_per_gas is invalid", None))
772 });
773 let cumulative_gas_used = u64::try_from(
774 tx_cumulative_gas_used_opt
775 .unwrap_or_else(|| ic.report("cumulative_gas_used is None", i256::ZERO))
776 .as_i128(),
777 )
778 .unwrap();
779 let contract_address: Option<Address> = tx_contract_address_opt.and_then(|a| {
780 a.try_into()
781 .ok()
782 .or_else(|| ic.report("contract_address is invalid", None))
783 });
784 let logs_bloom =
785 tx_logs_bloom_opt.unwrap_or_else(|| ic.report("logs_bloom is None", &[0; 256]));
786 let status = tx_status_opt.unwrap_or_else(|| ic.report("status is None", 0));
787 let expected_sighash = tx_sighash_opt;
788 let access_list: Option<AccessList> = tx_access_list_opt.map(|array| {
789 let access_list_items = array.as_struct_opt().expect("access list is not a struct");
790 convert_arrow_array_into_access_list(access_list_items)
791 .expect("access list is invalid")
792 });
793 let max_fee_per_blob_gas: Option<u128> = tx_max_fee_per_blob_gas_opt.and_then(|value| {
794 value
795 .as_i128()
796 .try_into()
797 .ok()
798 .or_else(|| ic.report("max_fee_per_blob_gas is invalid", None))
799 });
800 let blob_versioned_hashes: Option<Vec<FixedBytes<32>>> =
801 tx_blob_versioned_hashes_opt.map(|array| {
802 let binary_array = array
803 .as_any()
804 .downcast_ref::<BinaryArray>()
805 .expect("blob_versioned_hashes must be a BinaryArray");
806 convert_binary_array_32_to_fixed_hashes(binary_array)
807 });
808 let tx_type = tx_type_opt.unwrap_or(if access_list.is_some() {
809 if max_priority_fee_per_gas.is_some() {
810 if max_fee_per_blob_gas.is_some() {
811 3
812 } else {
813 2
814 }
815 } else {
816 1
817 }
818 } else {
819 0
820 });
821
822 if block_num != current_block_num {
824 if !block_tx_receipts.is_empty() {
825 let receipt_root = calculate_receipt_root(&block_tx_receipts);
826 let transactions_root = calculate_transaction_root(&block_tx_envelopes);
827 receipts_root_by_block_num_mapping.insert(current_block_num, receipt_root);
828 transactions_root_by_block_num_mapping.insert(current_block_num, transactions_root);
829 block_tx_receipts.clear();
830 block_tx_envelopes.clear();
831 }
832 current_block_num = block_num;
833 }
834
835 match expected_sighash {
837 Some(expected_sighash) => {
838 let sighash: [u8; 4] = input[..4].try_into().unwrap_or_else(|_| {
839 ic.report(
840 "input must be at least 4 bytes long for a tx with a sighash",
841 [0; 4],
842 )
843 });
844 if sighash != expected_sighash {
845 ic.report(
846 format!(
847 "sighash mismatch. Expected:\n{:?},\nFound:\n{:?}",
848 expected_sighash, sighash
849 )
850 .as_str(),
851 (),
852 );
853 }
854 }
855 None => {
856 if input.len() > 4 {
857 ic.report("sighash is None, with a non-zero input", ());
858 }
859 }
860 }
861
862 let tx_kind = match contract_address {
864 None => TxKind::Call(to.unwrap_or_else(|| {
865 ic.report(
866 "to is None, while contract_address is also None",
867 Address::ZERO,
868 )
869 })),
870 Some(_) => TxKind::Create,
871 };
872 let primitive_sig = PrimitiveSignature::new(r, s, v);
873
874 let tx_envelope = match tx_type {
876 0 => {
877 let tx = TxLegacy {
878 chain_id,
879 nonce,
880 gas_price,
881 gas_limit,
882 to: tx_kind,
883 value,
884 input,
885 };
886 let signed_tx = tx.into_signed(primitive_sig);
887 TxEnvelope::Legacy(signed_tx)
888 }
889 1 => {
890 let tx = TxEip2930 {
891 chain_id: chain_id.unwrap_or_else(|| {
892 ic.report("chain_id is None, for a Eip2930 transaction", 0)
893 }),
894 nonce,
895 gas_price,
896 gas_limit,
897 to: tx_kind,
898 value,
899 access_list: access_list.unwrap_or_else(|| {
900 ic.report(
901 "access list is None, for a Eip2930 transaction",
902 AccessList::default(),
903 )
904 }),
905 input,
906 };
907 let signed_tx = tx.into_signed(primitive_sig);
908 TxEnvelope::Eip2930(signed_tx)
909 }
910 2 => {
911 let tx = TxEip1559 {
912 chain_id: chain_id.unwrap_or_else(|| {
913 ic.report("chain_id is None, for a Eip1559 transaction", 0)
914 }),
915 nonce,
916 gas_limit,
917 max_fee_per_gas: max_fee_per_gas.unwrap_or_else(|| {
918 ic.report("max fee per gas is None, for a Eip1559 transaction", 0)
919 }),
920 max_priority_fee_per_gas: max_priority_fee_per_gas.unwrap_or_else(|| {
921 ic.report(
922 "max priority fee per gas is None, for a Eip1559 transaction",
923 0,
924 )
925 }),
926 to: tx_kind,
927 value,
928 access_list: access_list.unwrap_or_else(|| {
929 ic.report(
930 "access list is None, for a Eip1559 transaction",
931 AccessList::default(),
932 )
933 }),
934 input,
935 };
936 let signed_tx = tx.into_signed(primitive_sig);
937 TxEnvelope::Eip1559(signed_tx)
938 }
939 3 => {
940 let tx = TxEip4844Variant::TxEip4844(TxEip4844 {
941 chain_id: chain_id.unwrap_or_else(|| {
942 ic.report("chain_id is None, for a Eip4844 transaction", 0)
943 }),
944 nonce,
945 gas_limit,
946 max_fee_per_gas: max_fee_per_gas.unwrap_or_else(|| {
947 ic.report("max fee per gas is None, for a Eip4844 transaction", 0)
948 }),
949 max_priority_fee_per_gas: max_priority_fee_per_gas.unwrap_or_else(|| {
950 ic.report(
951 "max priority fee per gas is None, for a Eip4844 transaction",
952 0,
953 )
954 }),
955 to: to.unwrap_or_else(|| {
956 ic.report("to is None, for a Eip4844 transaction", Address::ZERO)
957 }),
958 value,
959 access_list: access_list.unwrap_or_else(|| {
960 ic.report(
961 "access list is None, for a Eip4844 transaction",
962 AccessList::default(),
963 )
964 }),
965 blob_versioned_hashes: blob_versioned_hashes.unwrap_or_else(|| {
966 ic.report(
967 "blob versioned hashes is None, for a Eip4844 transaction",
968 Vec::<FixedBytes<32>>::new(),
969 )
970 }),
971 max_fee_per_blob_gas: max_fee_per_blob_gas.unwrap_or_else(|| {
972 ic.report("max fee per blob gas is None, for a Eip4844 transaction", 0)
973 }),
974 input,
975 });
976 let signed_tx = tx.into_signed(primitive_sig);
977 TxEnvelope::Eip4844(signed_tx)
978 }
979 _ => return Err(anyhow!("Invalid tx type: {}", tx_type)),
992 };
993
994 let calculated_tx_hash = tx_envelope.tx_hash();
996 if calculated_tx_hash != &expected_hash {
997 ic.report(
998 format!(
999 "Calculated tx hash mismatch. Expected: {:?}, Found: {:?}",
1000 expected_hash, calculated_tx_hash
1001 )
1002 .as_str(),
1003 (),
1004 );
1005 }
1006 block_tx_envelopes.push(tx_envelope);
1007
1008 let (eip658value, tx_logs) = match status {
1010 0 => (Eip658Value::Eip658(false), &Vec::<Log>::new()),
1011 1 => (
1012 Eip658Value::Eip658(true),
1013 logs_by_block_num_and_tx_idx
1014 .get(&(block_num, tx_idx))
1015 .unwrap_or(&empty_logs),
1016 ),
1017 _ => return Err(anyhow!("Invalid tx status: {}", status)), };
1019
1020 let receipt = Receipt {
1022 status: eip658value,
1023 cumulative_gas_used,
1024 logs: tx_logs.to_vec(),
1025 };
1026
1027 let receiptwithbloom = receipt.with_bloom();
1029 let expected_bloom = Bloom::new(
1031 logs_bloom
1032 .try_into()
1033 .unwrap_or_else(|_| ic.report("logs bloom must be 256 bytes", [0; 256])),
1034 );
1035 if receiptwithbloom.logs_bloom != expected_bloom {
1037 ic.report(
1038 format!(
1039 "Calculated logs bloom mismatch.\nExpected {:?},\nFound: {:?}",
1040 expected_bloom, receiptwithbloom.logs_bloom
1041 )
1042 .as_str(),
1043 (),
1044 );
1045 }
1046 let receipt_envelope = match tx_type {
1048 0 => ReceiptEnvelope::Legacy(receiptwithbloom),
1049 1 => ReceiptEnvelope::Eip2930(receiptwithbloom),
1050 2 => ReceiptEnvelope::Eip1559(receiptwithbloom),
1051 3 => ReceiptEnvelope::Eip4844(receiptwithbloom),
1052 4 => ReceiptEnvelope::Eip7702(receiptwithbloom),
1053 _ => return Err(anyhow!("Invalid tx type: {}", tx_type)),
1054 };
1055 block_tx_receipts.push(receipt_envelope);
1057 }
1058
1059 let transactions_root = calculate_transaction_root(&block_tx_envelopes);
1061 let receipt_root = calculate_receipt_root(&block_tx_receipts);
1063 transactions_root_by_block_num_mapping.insert(current_block_num, transactions_root);
1064 receipts_root_by_block_num_mapping.insert(current_block_num, receipt_root);
1065 ic.set_context(DataContext::default());
1066
1067 let block_array = extract_block_cols_as_arrays(blocks)?;
1071
1072 let mut expected_transactions_and_receipts_root_by_block_num_mapping =
1074 BTreeMap::<u64, (FixedBytes<32>, FixedBytes<32>)>::new();
1075
1076 for ((block_num_opt, block_receipts_root_opt), block_transactions_root_opt) in block_array
1078 .number
1079 .iter()
1080 .zip(block_array.receipts_root.iter())
1081 .zip(block_array.transactions_root.iter())
1082 {
1083 let block_num = block_num_opt.unwrap();
1085 ic.set_context(DataContext::new(
1086 "Blocks".to_string(),
1087 format!("Block_num {}", block_num),
1088 ));
1089 let receipts_root = block_receipts_root_opt
1090 .unwrap_or_else(|| ic.report("receipts root is None", &[0; 32]))
1091 .try_into()
1092 .unwrap_or_else(|_| ic.report("receipts root is invalid", FixedBytes::ZERO));
1093 let transactions_root = block_transactions_root_opt
1094 .unwrap_or_else(|| ic.report("transactions root is None", &[0; 32]))
1095 .try_into()
1096 .unwrap_or_else(|_| ic.report("transactions root is invalid", FixedBytes::ZERO));
1097 expected_transactions_and_receipts_root_by_block_num_mapping
1099 .insert(block_num, (receipts_root, transactions_root));
1100 }
1101
1102 for (block_num, (expected_receipts_root, expected_transactions_root)) in
1103 expected_transactions_and_receipts_root_by_block_num_mapping.iter()
1104 {
1105 let null_root = <FixedBytes<32> as alloy_primitives::hex::FromHex>::from_hex(
1107 "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
1108 )
1109 .unwrap();
1110 if expected_receipts_root == expected_transactions_root
1111 && expected_receipts_root == &null_root
1112 {
1113 continue;
1114 }
1115 let calculated_receipts_root = receipts_root_by_block_num_mapping
1116 .get(block_num)
1117 .unwrap_or_else(|| {
1118 ic.report(
1119 "There is no calculated receipts root for this block",
1120 &FixedBytes::ZERO,
1121 )
1122 });
1123 let calculated_transactions_root = transactions_root_by_block_num_mapping
1124 .get(block_num)
1125 .unwrap_or_else(|| {
1126 ic.report(
1127 "There is no calculated transactions root for this block",
1128 &FixedBytes::ZERO,
1129 )
1130 });
1131 if expected_receipts_root != calculated_receipts_root {
1132 ic.report(
1133 format!(
1134 "Receipts root mismatch. Expected: {:?}, Found: {:?}",
1135 expected_receipts_root, calculated_receipts_root
1136 )
1137 .as_str(),
1138 (),
1139 );
1140 };
1141 if expected_transactions_root != calculated_transactions_root {
1142 ic.report(
1143 format!(
1144 "Transactions root mismatch. Expected: {:?}, Found: {:?}",
1145 expected_transactions_root, calculated_transactions_root
1146 )
1147 .as_str(),
1148 (),
1149 );
1150 }
1151 }
1152
1153 Ok(())
1154}
1155
1156fn extract_log_cols_as_arrays(logs: &RecordBatch) -> Result<LogArray> {
1157 let log_block_nums = logs
1158 .column_by_name("block_number")
1159 .context("get log block num col")?
1160 .as_any()
1161 .downcast_ref::<UInt64Array>()
1162 .context("get log block num col as u64")?;
1163
1164 let log_log_idx = logs
1165 .column_by_name("log_index")
1166 .context("get log log_index column")?
1167 .as_any()
1168 .downcast_ref::<UInt64Array>()
1169 .context("get log log_index col as u64")?;
1170
1171 let log_tx_idx = logs
1172 .column_by_name("transaction_index")
1173 .context("get tx index column")?
1174 .as_any()
1175 .downcast_ref::<UInt64Array>()
1176 .context("get tx index col as u64")?;
1177
1178 let log_address = logs
1179 .column_by_name("address")
1180 .context("get address column")?
1181 .as_any()
1182 .downcast_ref::<BinaryArray>()
1183 .context("get address as binary")?;
1184
1185 let log_topic0 = logs
1186 .column_by_name("topic0")
1187 .context("get topic0 column")?
1188 .as_any()
1189 .downcast_ref::<BinaryArray>()
1190 .context("get topic0 as binary")?;
1191
1192 let log_topic1 = logs
1193 .column_by_name("topic1")
1194 .context("get topic1 column")?
1195 .as_any()
1196 .downcast_ref::<BinaryArray>()
1197 .context("get topic1 as binary")?;
1198
1199 let log_topic2 = logs
1200 .column_by_name("topic2")
1201 .context("get topic2 column")?
1202 .as_any()
1203 .downcast_ref::<BinaryArray>()
1204 .context("get topic2 as binary")?;
1205
1206 let log_topic3 = logs
1207 .column_by_name("topic3")
1208 .context("get topic3 column")?
1209 .as_any()
1210 .downcast_ref::<BinaryArray>()
1211 .context("get topic3 as binary")?;
1212
1213 let log_data = logs
1214 .column_by_name("data")
1215 .context("get data column")?
1216 .as_any()
1217 .downcast_ref::<BinaryArray>()
1218 .context("get data as binary")?;
1219
1220 let log_array = LogArray {
1221 block_number: log_block_nums,
1222 tx_index: log_tx_idx,
1223 log_index: log_log_idx,
1224 address: log_address,
1225 topic0: log_topic0,
1226 topic1: log_topic1,
1227 topic2: log_topic2,
1228 topic3: log_topic3,
1229 data: log_data,
1230 };
1231
1232 Ok(log_array)
1234}
1235
1236fn extract_transaction_cols_as_arrays(transactions: &RecordBatch) -> Result<TransactionsArray> {
1237 let tx_block_nums = transactions
1238 .column_by_name("block_number")
1239 .context("get tx block num col")?
1240 .as_any()
1241 .downcast_ref::<UInt64Array>()
1242 .context("get tx block num col as u64")?;
1243
1244 let tx_gas_limit = transactions
1245 .column_by_name("gas")
1246 .context("get tx gas limit column")?
1247 .as_any()
1248 .downcast_ref::<Decimal256Array>()
1249 .context("get tx gas limit col as decimal256")?;
1250
1251 let tx_gas_price = transactions
1252 .column_by_name("gas_price")
1253 .context("get tx gas price column")?
1254 .as_any()
1255 .downcast_ref::<Decimal256Array>()
1256 .context("get tx gas price col as decimal256")?;
1257
1258 let tx_hash = transactions
1259 .column_by_name("hash")
1260 .context("get tx hash column")?
1261 .as_any()
1262 .downcast_ref::<BinaryArray>()
1263 .context("get tx hash col as binary")?;
1264
1265 let tx_input = transactions
1266 .column_by_name("input")
1267 .context("get tx input column")?
1268 .as_any()
1269 .downcast_ref::<BinaryArray>()
1270 .context("get tx input col as binary")?;
1271
1272 let tx_nonce = transactions
1273 .column_by_name("nonce")
1274 .context("get tx nonce column")?
1275 .as_any()
1276 .downcast_ref::<Decimal256Array>()
1277 .context("get tx nonce col as binary")?;
1278
1279 let tx_to = transactions
1280 .column_by_name("to")
1281 .context("get tx to column")?
1282 .as_any()
1283 .downcast_ref::<BinaryArray>()
1284 .context("get tx to col as binary")?;
1285
1286 let tx_tx_idx = transactions
1287 .column_by_name("transaction_index")
1288 .context("get tx index column")?
1289 .as_any()
1290 .downcast_ref::<UInt64Array>()
1291 .context("get tx index col as u64")?;
1292
1293 let tx_value = transactions
1294 .column_by_name("value")
1295 .context("get tx value column")?
1296 .as_any()
1297 .downcast_ref::<Decimal256Array>()
1298 .context("get tx value col as decimal256")?;
1299
1300 let tx_v = transactions
1301 .column_by_name("v")
1302 .context("get tx v column")?
1303 .as_any()
1304 .downcast_ref::<UInt8Array>()
1305 .context("get tx v col as u8")?;
1306
1307 let tx_r = transactions
1308 .column_by_name("r")
1309 .context("get tx r column")?
1310 .as_any()
1311 .downcast_ref::<BinaryArray>()
1312 .context("get tx r col as binary")?;
1313
1314 let tx_s = transactions
1315 .column_by_name("s")
1316 .context("get tx s column")?
1317 .as_any()
1318 .downcast_ref::<BinaryArray>()
1319 .context("get tx s col as binary")?;
1320
1321 let tx_max_priority_fee_per_gas = transactions
1322 .column_by_name("max_priority_fee_per_gas")
1323 .context("get tx max priority fee per gas column")?
1324 .as_any()
1325 .downcast_ref::<Decimal256Array>()
1326 .context("get tx max priority fee per gas col as decimal256")?;
1327
1328 let tx_max_fee_per_gas = transactions
1329 .column_by_name("max_fee_per_gas")
1330 .context("get tx max fee per gas column")?
1331 .as_any()
1332 .downcast_ref::<Decimal256Array>()
1333 .context("get tx max fee per gas col as decimal256")?;
1334
1335 let tx_chain_id = transactions
1336 .column_by_name("chain_id")
1337 .context("get tx chain id column")?
1338 .as_any()
1339 .downcast_ref::<Decimal256Array>()
1340 .context("get tx chain id col as decimal256")?;
1341
1342 let tx_cumulative_gas_used = transactions
1343 .column_by_name("cumulative_gas_used")
1344 .context("get tx cumulative gas used column")?
1345 .as_any()
1346 .downcast_ref::<Decimal256Array>()
1347 .context("get tx cumulative gas used col as decimal256")?;
1348
1349 let tx_contract_address = transactions
1350 .column_by_name("contract_address")
1351 .context("get tx contract address column")?
1352 .as_any()
1353 .downcast_ref::<BinaryArray>()
1354 .context("get tx contract address col as binary")?;
1355
1356 let tx_logs_bloom: &GenericByteArray<GenericBinaryType<i32>> = transactions
1357 .column_by_name("logs_bloom")
1358 .context("get tx logs bloom column")?
1359 .as_any()
1360 .downcast_ref::<BinaryArray>()
1361 .context("get tx logs bloom col as binary")?;
1362
1363 let tx_type = transactions
1364 .column_by_name("type")
1365 .context("get tx type column")?
1366 .as_any()
1367 .downcast_ref::<UInt8Array>()
1368 .context("get tx type col as u8")?;
1369
1370 let tx_status = transactions
1371 .column_by_name("status")
1372 .context("get tx status column")?
1373 .as_any()
1374 .downcast_ref::<UInt8Array>()
1375 .context("get tx status col as u8")?;
1376
1377 let tx_sighash = transactions
1378 .column_by_name("sighash")
1379 .context("get tx sig hash column")?
1380 .as_any()
1381 .downcast_ref::<BinaryArray>()
1382 .context("get tx sig hash col as binary")?;
1383
1384 let tx_access_list = transactions
1385 .column_by_name("access_list")
1386 .context("get tx access list column")?
1387 .as_any()
1388 .downcast_ref::<GenericListArray<i32>>()
1389 .context("get tx access list col as binary")?;
1390
1391 let tx_max_fee_per_blob_gas = transactions
1392 .column_by_name("max_fee_per_blob_gas")
1393 .context("get tx max fee per blob gas column")?
1394 .as_any()
1395 .downcast_ref::<Decimal256Array>()
1396 .context("get tx max fee per blob gas col as decimal256")?;
1397
1398 let tx_blob_versioned_hashes = transactions
1399 .column_by_name("blob_versioned_hashes")
1400 .context("get tx blob versioned hashes column")?
1401 .as_any()
1402 .downcast_ref::<GenericListArray<i32>>()
1403 .context("get tx blob versioned hashes col as binary")?;
1404
1405 let tx_array = TransactionsArray {
1406 block_number: tx_block_nums,
1407 gas_limit: tx_gas_limit,
1408 gas_price: tx_gas_price,
1409 hash: tx_hash,
1410 input: tx_input,
1411 nonce: tx_nonce,
1412 to: tx_to,
1413 tx_index: tx_tx_idx,
1414 value: tx_value,
1415 v: tx_v,
1416 r: tx_r,
1417 s: tx_s,
1418 max_priority_fee_per_gas: tx_max_priority_fee_per_gas,
1419 max_fee_per_gas: tx_max_fee_per_gas,
1420 chain_id: tx_chain_id,
1421 cumulative_gas_used: tx_cumulative_gas_used,
1422 contract_address: tx_contract_address,
1423 logs_bloom: tx_logs_bloom,
1424 tx_type,
1425 status: tx_status,
1426 sighash: tx_sighash,
1427 access_list: tx_access_list,
1428 max_fee_per_blob_gas: tx_max_fee_per_blob_gas,
1429 blob_versioned_hashes: tx_blob_versioned_hashes,
1430 };
1431
1432 Ok(tx_array)
1433}
1434
1435fn extract_block_cols_as_arrays(blocks: &RecordBatch) -> Result<BlockArray> {
1436 let block_numbers = blocks
1437 .column_by_name("number")
1438 .context("get block number column")?
1439 .as_any()
1440 .downcast_ref::<UInt64Array>()
1441 .context("get block number column as u64")?;
1442
1443 let block_receipts_root = blocks
1444 .column_by_name("receipts_root")
1445 .context("get block receipts_root column")?
1446 .as_any()
1447 .downcast_ref::<BinaryArray>()
1448 .context("get block receipts_root as binary")?;
1449
1450 let block_transactions_root = blocks
1451 .column_by_name("transactions_root")
1452 .context("get block transactions_root column")?
1453 .as_any()
1454 .downcast_ref::<BinaryArray>()
1455 .context("get block transactions_root as binary")?;
1456
1457 let block_array = BlockArray {
1458 number: block_numbers,
1459 receipts_root: block_receipts_root,
1460 transactions_root: block_transactions_root,
1461 };
1462
1463 Ok(block_array)
1464}
1465
1466fn convert_arrow_array_into_access_list(array: &StructArray) -> Result<AccessList> {
1467 let mut items = Vec::with_capacity(array.len());
1468
1469 let address_array = array
1471 .column_by_name("address")
1472 .context("Missing 'address' field")?
1473 .as_any()
1474 .downcast_ref::<BinaryArray>()
1475 .context("'address' field is not a BinaryArray")?;
1476
1477 let storage_keys_array = array
1478 .column_by_name("storage_keys")
1479 .context("Missing 'storage_keys' field")?
1480 .as_any()
1481 .downcast_ref::<ListArray>()
1482 .context("'storage_keys' field is not a ListArray")?;
1483
1484 let storage_keys_values = storage_keys_array
1485 .values()
1486 .as_any()
1487 .downcast_ref::<BinaryArray>()
1488 .context("Storage keys values are not a BinaryArray")?;
1489
1490 let storage_keys_offsets = storage_keys_array.offsets();
1491
1492 for i in 0..array.len() {
1494 if array.is_null(i) {
1495 continue;
1496 }
1497
1498 if address_array.is_null(i) || storage_keys_array.is_null(i) {
1500 continue;
1501 }
1502
1503 let bytes = address_array.value(i);
1505 if bytes.len() != 20 {
1506 return Err(anyhow::anyhow!("Invalid address length: {}", bytes.len()));
1507 }
1508 let mut addr = [0u8; 20];
1509 addr.copy_from_slice(bytes);
1510 let address = Address::new(addr);
1511
1512 let mut storage_keys = Vec::new();
1514 let start_offset = *storage_keys_offsets.get(i).expect("start offset is null") as usize;
1515 let end_offset = *storage_keys_offsets.get(i + 1).expect("end offset is null") as usize;
1516
1517 for j in start_offset..end_offset {
1518 let bytes = storage_keys_values.value(j);
1519
1520 if bytes.len() != 32 {
1522 return Err(anyhow::anyhow!("Invalid B256 length: {}", bytes.len()));
1523 }
1524
1525 let mut b256 = [0u8; 32];
1526 b256.copy_from_slice(bytes);
1527 storage_keys.push(B256::new(b256));
1528 }
1529
1530 items.push(AccessListItem {
1531 address,
1532 storage_keys,
1533 });
1534 }
1535
1536 Ok(AccessList(items))
1537}
1538
1539fn convert_binary_array_32_to_fixed_hashes(binary_array: &BinaryArray) -> Vec<FixedBytes<32>> {
1540 binary_array
1541 .iter()
1542 .map(|bytes| {
1543 let bytes = bytes.expect("blob versioned hash cannot be null");
1544 let mut hash = [0u8; 32];
1545 hash.copy_from_slice(bytes);
1546 FixedBytes::<32>::new(hash)
1547 })
1548 .collect()
1549}