1use anyhow::Context;
7use arrow::{
8 array::{
9 Array, ArrayAccessor, BinaryArray, BooleanArray, RecordBatch, StringArray, UInt64Array,
10 UInt8Array,
11 },
12 datatypes::DataType,
13};
14use hypersync_format::{
15 AccessList, Address, Authorization, BlockNumber, Data, FixedSizeData, Hash, LogIndex, Quantity,
16 TransactionIndex, TransactionStatus, TransactionType, Withdrawal,
17};
18use hypersync_net_types::{BlockField, LogField, TraceField, TransactionField};
19
20type ColResult<T> = std::result::Result<T, ColumnError>;
21
22#[derive(Debug, thiserror::Error)]
24#[error("column {col_name} {err}")]
25pub struct ColumnError {
26 pub col_name: &'static str,
28 pub err: ColumnErrorType,
30}
31
32impl ColumnError {
33 fn not_found(col_name: &'static str) -> Self {
34 Self {
35 col_name,
36 err: ColumnErrorType::NotFound,
37 }
38 }
39
40 fn wrong_type(
41 col_name: &'static str,
42 expected_type: &'static str,
43 actual_type: DataType,
44 ) -> Self {
45 Self {
46 col_name,
47 err: ColumnErrorType::WrongType {
48 expected_type,
49 actual_type,
50 },
51 }
52 }
53}
54
55#[derive(Debug, thiserror::Error)]
57pub enum ColumnErrorType {
58 #[error("not found")]
60 NotFound,
61 #[error("expected to be of type {expected_type} but found {actual_type}")]
63 WrongType {
64 expected_type: &'static str,
66 actual_type: DataType,
68 },
69}
70
71fn column_as<'a, T: 'static>(batch: &'a RecordBatch, col_name: &'static str) -> ColResult<&'a T> {
72 match batch.column_by_name(col_name) {
73 None => Err(ColumnError::not_found(col_name)),
74 Some(c) => {
75 let Some(val) = c.as_any().downcast_ref::<T>() else {
76 let expected_type = std::any::type_name::<T>();
77 let actual_type = c.data_type().clone();
78 return Err(ColumnError::wrong_type(
79 col_name,
80 expected_type,
81 actual_type,
82 ));
83 };
84 Ok(val)
85 }
86 }
87}
88
89#[derive(Debug, thiserror::Error)]
91pub enum ReadError {
92 #[error("value was unexpectedly null")]
94 UnexpectedNull,
95 #[error(transparent)]
97 ColumnError(#[from] ColumnError),
98 #[error("{0:?}")]
100 ConversionError(anyhow::Error),
101}
102
103pub struct ArrowRowReader<'a> {
108 batch: &'a RecordBatch,
109 row_idx: usize,
110}
111
112impl<'a> ArrowRowReader<'a> {
113 fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
116 let len = if let Some(first_column) = batch.columns().first() {
117 first_column.len()
118 } else {
119 0
120 };
121 if row_idx >= len {
122 anyhow::bail!("row index out of bounds");
123 }
124
125 Ok(Self { batch, row_idx })
126 }
127
128 fn get_nullable<Col, T>(&self, col_name: &'static str) -> Result<Option<T>, ReadError>
130 where
131 Col: 'static,
132 &'a Col: ArrayAccessor,
133
134 <&'a Col as ArrayAccessor>::Item: TryInto<T>,
135 <<&'a Col as ArrayAccessor>::Item as TryInto<T>>::Error:
136 std::error::Error + Send + Sync + 'static,
137 {
138 let arr = column_as::<Col>(self.batch, col_name)?;
139
140 if arr.is_valid(self.row_idx) {
141 let value = arr.value(self.row_idx);
142 let converted: T = value
143 .try_into()
144 .map_err(|e| ReadError::ConversionError(anyhow::Error::new(e)))?;
145 Ok(Some(converted))
146 } else {
147 Ok(None)
148 }
149 }
150
151 fn get<Col, T>(&self, col_name: &'static str) -> Result<T, ReadError>
153 where
154 Col: 'static,
155 &'a Col: ArrayAccessor,
156
157 <&'a Col as ArrayAccessor>::Item: TryInto<T>,
158 <<&'a Col as ArrayAccessor>::Item as TryInto<T>>::Error:
159 std::error::Error + Send + Sync + 'static,
160 {
161 match self.get_nullable::<Col, T>(col_name) {
162 Ok(Some(val)) => Ok(val),
163 Ok(None) => Err(ReadError::UnexpectedNull),
164 Err(e) => Err(e),
165 }
166 }
167}
168
169pub struct ArrowRowIterator<'a, R> {
171 batch: &'a RecordBatch,
172 current_idx: usize,
173 len: usize,
174 phantom: std::marker::PhantomData<R>,
175}
176
177impl<'a, R: From<ArrowRowReader<'a>>> ArrowRowIterator<'a, R> {
178 pub fn new(batch: &'a RecordBatch) -> Self {
180 let len = if let Some(first_column) = batch.columns().first() {
181 first_column.len()
182 } else {
183 0
184 };
185 Self {
186 batch,
187 current_idx: 0,
188 len,
189 phantom: std::marker::PhantomData,
190 }
191 }
192}
193
194impl<'a, R: From<ArrowRowReader<'a>>> Iterator for ArrowRowIterator<'a, R> {
195 type Item = R;
196
197 fn next(&mut self) -> Option<Self::Item> {
198 if self.current_idx < self.len {
199 let reader = ArrowRowReader {
200 batch: self.batch,
201 row_idx: self.current_idx,
202 };
203 self.current_idx += 1;
204 Some(reader.into())
205 } else {
206 None
207 }
208 }
209
210 fn size_hint(&self) -> (usize, Option<usize>) {
211 let remaining = self.len - self.current_idx;
212 (remaining, Some(remaining))
213 }
214}
215
216impl<'a, R: From<ArrowRowReader<'a>>> ExactSizeIterator for ArrowRowIterator<'a, R> {
217 fn len(&self) -> usize {
218 self.len - self.current_idx
219 }
220}
221
222pub struct LogReader<'a> {
227 inner: ArrowRowReader<'a>,
228}
229
230impl<'a> From<ArrowRowReader<'a>> for LogReader<'a> {
231 fn from(inner: ArrowRowReader<'a>) -> Self {
232 Self { inner }
233 }
234}
235
236pub type LogIterator<'a> = ArrowRowIterator<'a, LogReader<'a>>;
238
239impl<'a> LogReader<'a> {
240 pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
243 let inner = ArrowRowReader::new(batch, row_idx)?;
244 Ok(Self { inner })
245 }
246 pub fn iter(batch: &'a RecordBatch) -> LogIterator<'a> {
248 LogIterator::new(batch)
249 }
250
251 pub fn removed(&self) -> Result<Option<bool>, ReadError> {
254 self.inner
255 .get_nullable::<BooleanArray, bool>(LogField::Removed.as_ref())
256 }
257
258 pub fn log_index(&self) -> Result<LogIndex, ReadError> {
260 self.inner
261 .get::<UInt64Array, LogIndex>(LogField::LogIndex.as_ref())
262 }
263
264 pub fn transaction_index(&self) -> Result<TransactionIndex, ReadError> {
266 self.inner
267 .get::<UInt64Array, TransactionIndex>(LogField::TransactionIndex.as_ref())
268 }
269
270 pub fn transaction_hash(&self) -> Result<Hash, ReadError> {
272 self.inner
273 .get::<BinaryArray, Hash>(LogField::TransactionHash.as_ref())
274 }
275
276 pub fn block_hash(&self) -> Result<Hash, ReadError> {
278 self.inner
279 .get::<BinaryArray, Hash>(LogField::BlockHash.as_ref())
280 }
281
282 pub fn block_number(&self) -> Result<BlockNumber, ReadError> {
284 self.inner
285 .get::<UInt64Array, BlockNumber>(LogField::BlockNumber.as_ref())
286 }
287
288 pub fn address(&self) -> Result<Address, ReadError> {
290 self.inner
291 .get::<BinaryArray, Address>(LogField::Address.as_ref())
292 }
293
294 pub fn topic0(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
296 self.inner
297 .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic0.as_ref())
298 }
299
300 pub fn topic1(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
302 self.inner
303 .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic1.as_ref())
304 }
305
306 pub fn topic2(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
308 self.inner
309 .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic2.as_ref())
310 }
311
312 pub fn topic3(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
314 self.inner
315 .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic3.as_ref())
316 }
317
318 pub fn data(&self) -> Result<Data, ReadError> {
320 self.inner.get::<BinaryArray, Data>(LogField::Data.as_ref())
321 }
322}
323
324pub struct BlockReader<'a> {
329 inner: ArrowRowReader<'a>,
330}
331
332impl<'a> From<ArrowRowReader<'a>> for BlockReader<'a> {
333 fn from(inner: ArrowRowReader<'a>) -> Self {
334 Self { inner }
335 }
336}
337
338pub type BlockIterator<'a> = ArrowRowIterator<'a, BlockReader<'a>>;
340
341impl<'a> BlockReader<'a> {
342 pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
345 let inner = ArrowRowReader::new(batch, row_idx)?;
346 Ok(Self { inner })
347 }
348
349 pub fn iter(batch: &'a RecordBatch) -> BlockIterator<'a> {
351 BlockIterator::new(batch)
352 }
353
354 pub fn number(&self) -> Result<u64, ReadError> {
356 self.inner
357 .get::<UInt64Array, _>(BlockField::Number.as_ref())
358 }
359
360 pub fn hash(&self) -> Result<Hash, ReadError> {
362 self.inner
363 .get::<BinaryArray, Hash>(BlockField::Hash.as_ref())
364 }
365
366 pub fn parent_hash(&self) -> Result<Hash, ReadError> {
368 self.inner
369 .get::<BinaryArray, Hash>(BlockField::ParentHash.as_ref())
370 }
371
372 pub fn nonce(&self) -> Result<Option<FixedSizeData<8>>, ReadError> {
374 self.inner
375 .get_nullable::<BinaryArray, FixedSizeData<8>>(BlockField::Nonce.as_ref())
376 }
377
378 pub fn sha3_uncles(&self) -> Result<Hash, ReadError> {
380 self.inner
381 .get::<BinaryArray, Hash>(BlockField::Sha3Uncles.as_ref())
382 }
383
384 pub fn logs_bloom(&self) -> Result<Data, ReadError> {
386 self.inner
387 .get::<BinaryArray, Data>(BlockField::LogsBloom.as_ref())
388 }
389
390 pub fn transactions_root(&self) -> Result<Hash, ReadError> {
392 self.inner
393 .get::<BinaryArray, Hash>(BlockField::TransactionsRoot.as_ref())
394 }
395
396 pub fn state_root(&self) -> Result<Hash, ReadError> {
398 self.inner
399 .get::<BinaryArray, Hash>(BlockField::StateRoot.as_ref())
400 }
401
402 pub fn receipts_root(&self) -> Result<Hash, ReadError> {
404 self.inner
405 .get::<BinaryArray, Hash>(BlockField::ReceiptsRoot.as_ref())
406 }
407
408 pub fn miner(&self) -> Result<Address, ReadError> {
410 self.inner
411 .get::<BinaryArray, Address>(BlockField::Miner.as_ref())
412 }
413
414 pub fn difficulty(&self) -> Result<Option<Quantity>, ReadError> {
416 self.inner
417 .get_nullable::<BinaryArray, Quantity>(BlockField::Difficulty.as_ref())
418 }
419
420 pub fn total_difficulty(&self) -> Result<Option<Quantity>, ReadError> {
422 self.inner
423 .get_nullable::<BinaryArray, Quantity>(BlockField::TotalDifficulty.as_ref())
424 }
425
426 pub fn extra_data(&self) -> Result<Data, ReadError> {
428 self.inner
429 .get::<BinaryArray, Data>(BlockField::ExtraData.as_ref())
430 }
431
432 pub fn size(&self) -> Result<Quantity, ReadError> {
434 self.inner
435 .get::<BinaryArray, Quantity>(BlockField::Size.as_ref())
436 }
437
438 pub fn gas_limit(&self) -> Result<Quantity, ReadError> {
440 self.inner
441 .get::<BinaryArray, Quantity>(BlockField::GasLimit.as_ref())
442 }
443
444 pub fn gas_used(&self) -> Result<Quantity, ReadError> {
446 self.inner
447 .get::<BinaryArray, Quantity>(BlockField::GasUsed.as_ref())
448 }
449
450 pub fn timestamp(&self) -> Result<Quantity, ReadError> {
452 self.inner
453 .get::<BinaryArray, Quantity>(BlockField::Timestamp.as_ref())
454 }
455
456 pub fn uncles(&self) -> Result<Option<Vec<FixedSizeData<32>>>, ReadError> {
458 let all = self
459 .inner
460 .get_nullable::<BinaryArray, Data>(BlockField::Uncles.as_ref())?;
461 let Some(data) = all else {
462 return Ok(None);
463 };
464 let mut uncles = Vec::new();
465 for uncle_bytes in data.chunks(32) {
466 let uncle = FixedSizeData::<32>::try_from(uncle_bytes)
467 .context("convert uncle bytes to uncle")
468 .map_err(ReadError::ConversionError)?;
469 uncles.push(uncle);
470 }
471 Ok(Some(uncles))
472 }
473
474 pub fn base_fee_per_gas(&self) -> Result<Option<Quantity>, ReadError> {
476 self.inner
477 .get_nullable::<BinaryArray, Quantity>(BlockField::BaseFeePerGas.as_ref())
478 }
479
480 pub fn blob_gas_used(&self) -> Result<Option<Quantity>, ReadError> {
482 self.inner
483 .get_nullable::<BinaryArray, Quantity>(BlockField::BlobGasUsed.as_ref())
484 }
485
486 pub fn excess_blob_gas(&self) -> Result<Option<Quantity>, ReadError> {
488 self.inner
489 .get_nullable::<BinaryArray, Quantity>(BlockField::ExcessBlobGas.as_ref())
490 }
491
492 pub fn parent_beacon_block_root(&self) -> Result<Option<Hash>, ReadError> {
494 self.inner
495 .get_nullable::<BinaryArray, Hash>(BlockField::ParentBeaconBlockRoot.as_ref())
496 }
497
498 pub fn withdrawals_root(&self) -> Result<Option<Hash>, ReadError> {
500 self.inner
501 .get_nullable::<BinaryArray, Hash>(BlockField::WithdrawalsRoot.as_ref())
502 }
503
504 pub fn withdrawals(&self) -> Result<Option<Vec<Withdrawal>>, ReadError> {
506 let withdrawals_bin = self
507 .inner
508 .get_nullable::<BinaryArray, Data>(BlockField::Withdrawals.as_ref())?;
509 let Some(withdrawals_bin) = withdrawals_bin else {
510 return Ok(None);
511 };
512
513 let deser = bincode::deserialize(&withdrawals_bin)
514 .context("deserialize withdrawals")
515 .map_err(ReadError::ConversionError)?;
516
517 Ok(Some(deser))
518 }
519
520 pub fn l1_block_number(&self) -> Result<Option<BlockNumber>, ReadError> {
522 self.inner
523 .get_nullable::<UInt64Array, _>(BlockField::L1BlockNumber.as_ref())
524 }
525
526 pub fn send_count(&self) -> Result<Option<Quantity>, ReadError> {
528 self.inner
529 .get_nullable::<BinaryArray, Quantity>(BlockField::SendCount.as_ref())
530 }
531
532 pub fn send_root(&self) -> Result<Option<Hash>, ReadError> {
534 self.inner
535 .get_nullable::<BinaryArray, Hash>(BlockField::SendRoot.as_ref())
536 }
537
538 pub fn mix_hash(&self) -> Result<Option<Hash>, ReadError> {
540 self.inner
541 .get_nullable::<BinaryArray, Hash>(BlockField::MixHash.as_ref())
542 }
543}
544
545pub struct TransactionReader<'a> {
550 inner: ArrowRowReader<'a>,
551}
552
553impl<'a> From<ArrowRowReader<'a>> for TransactionReader<'a> {
554 fn from(inner: ArrowRowReader<'a>) -> Self {
555 Self { inner }
556 }
557}
558
559pub type TransactionIterator<'a> = ArrowRowIterator<'a, TransactionReader<'a>>;
561
562impl<'a> TransactionReader<'a> {
563 pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
566 let inner = ArrowRowReader::new(batch, row_idx)?;
567 Ok(Self { inner })
568 }
569
570 pub fn iter(batch: &'a RecordBatch) -> TransactionIterator<'a> {
572 TransactionIterator::new(batch)
573 }
574
575 pub fn block_hash(&self) -> Result<Hash, ReadError> {
577 self.inner
578 .get::<BinaryArray, Hash>(TransactionField::BlockHash.as_ref())
579 }
580
581 pub fn block_number(&self) -> Result<BlockNumber, ReadError> {
583 self.inner
584 .get::<UInt64Array, BlockNumber>(TransactionField::BlockNumber.as_ref())
585 }
586
587 pub fn from(&self) -> Result<Option<Address>, ReadError> {
589 self.inner
590 .get_nullable::<BinaryArray, Address>(TransactionField::From.as_ref())
591 }
592
593 pub fn gas(&self) -> Result<Quantity, ReadError> {
595 self.inner
596 .get::<BinaryArray, Quantity>(TransactionField::Gas.as_ref())
597 }
598
599 pub fn gas_price(&self) -> Result<Option<Quantity>, ReadError> {
601 self.inner
602 .get_nullable::<BinaryArray, Quantity>(TransactionField::GasPrice.as_ref())
603 }
604
605 pub fn hash(&self) -> Result<Hash, ReadError> {
607 self.inner
608 .get::<BinaryArray, Hash>(TransactionField::Hash.as_ref())
609 }
610
611 pub fn input(&self) -> Result<Data, ReadError> {
613 self.inner
614 .get::<BinaryArray, Data>(TransactionField::Input.as_ref())
615 }
616
617 pub fn nonce(&self) -> Result<Quantity, ReadError> {
619 self.inner
620 .get::<BinaryArray, Quantity>(TransactionField::Nonce.as_ref())
621 }
622
623 pub fn to(&self) -> Result<Option<Address>, ReadError> {
625 self.inner
626 .get_nullable::<BinaryArray, Address>(TransactionField::To.as_ref())
627 }
628
629 pub fn transaction_index(&self) -> Result<TransactionIndex, ReadError> {
631 self.inner
632 .get::<UInt64Array, TransactionIndex>(TransactionField::TransactionIndex.as_ref())
633 }
634
635 pub fn value(&self) -> Result<Quantity, ReadError> {
637 self.inner
638 .get::<BinaryArray, Quantity>(TransactionField::Value.as_ref())
639 }
640
641 pub fn v(&self) -> Result<Option<Quantity>, ReadError> {
643 self.inner
644 .get_nullable::<BinaryArray, Quantity>(TransactionField::V.as_ref())
645 }
646
647 pub fn r(&self) -> Result<Option<Quantity>, ReadError> {
649 self.inner
650 .get_nullable::<BinaryArray, Quantity>(TransactionField::R.as_ref())
651 }
652
653 pub fn s(&self) -> Result<Option<Quantity>, ReadError> {
655 self.inner
656 .get_nullable::<BinaryArray, Quantity>(TransactionField::S.as_ref())
657 }
658
659 pub fn max_priority_fee_per_gas(&self) -> Result<Option<Quantity>, ReadError> {
661 self.inner
662 .get_nullable::<BinaryArray, Quantity>(TransactionField::MaxPriorityFeePerGas.as_ref())
663 }
664
665 pub fn max_fee_per_gas(&self) -> Result<Option<Quantity>, ReadError> {
667 self.inner
668 .get_nullable::<BinaryArray, Quantity>(TransactionField::MaxFeePerGas.as_ref())
669 }
670
671 pub fn chain_id(&self) -> Result<Option<Quantity>, ReadError> {
673 self.inner
674 .get_nullable::<BinaryArray, Quantity>(TransactionField::ChainId.as_ref())
675 }
676
677 pub fn cumulative_gas_used(&self) -> Result<Quantity, ReadError> {
679 self.inner
680 .get::<BinaryArray, Quantity>(TransactionField::CumulativeGasUsed.as_ref())
681 }
682
683 pub fn effective_gas_price(&self) -> Result<Quantity, ReadError> {
685 self.inner
686 .get::<BinaryArray, Quantity>(TransactionField::EffectiveGasPrice.as_ref())
687 }
688
689 pub fn gas_used(&self) -> Result<Quantity, ReadError> {
691 self.inner
692 .get::<BinaryArray, Quantity>(TransactionField::GasUsed.as_ref())
693 }
694
695 pub fn contract_address(&self) -> Result<Option<Address>, ReadError> {
697 self.inner
698 .get_nullable::<BinaryArray, Address>(TransactionField::ContractAddress.as_ref())
699 }
700
701 pub fn logs_bloom(&self) -> Result<Data, ReadError> {
703 self.inner
704 .get::<BinaryArray, Data>(TransactionField::LogsBloom.as_ref())
705 }
706
707 pub fn type_(&self) -> Result<Option<TransactionType>, ReadError> {
709 let type_ = self
710 .inner
711 .get_nullable::<UInt8Array, u8>(TransactionField::Type.as_ref())?;
712 Ok(type_.map(TransactionType::from))
713 }
714
715 pub fn root(&self) -> Result<Option<Hash>, ReadError> {
717 self.inner
718 .get_nullable::<BinaryArray, Hash>(TransactionField::Root.as_ref())
719 }
720
721 pub fn status(&self) -> Result<Option<TransactionStatus>, ReadError> {
723 let status = self
724 .inner
725 .get_nullable::<UInt8Array, u8>(TransactionField::Status.as_ref())?;
726 let Some(status) = status else {
727 return Ok(None);
728 };
729 let status = TransactionStatus::from_u8(status)
730 .context("convert u8 to transaction status")
731 .map_err(ReadError::ConversionError)?;
732 Ok(Some(status))
733 }
734
735 pub fn sighash(&self) -> Result<Option<Data>, ReadError> {
737 self.inner
738 .get_nullable::<BinaryArray, _>(TransactionField::Sighash.as_ref())
739 }
740
741 pub fn y_parity(&self) -> Result<Option<Quantity>, ReadError> {
743 self.inner
744 .get_nullable::<BinaryArray, Quantity>(TransactionField::YParity.as_ref())
745 }
746
747 pub fn access_list(&self) -> Result<Option<Vec<AccessList>>, ReadError> {
749 let bin = self
750 .inner
751 .get_nullable::<BinaryArray, Data>(TransactionField::AccessList.as_ref())?;
752 let Some(bin) = bin else {
753 return Ok(None);
754 };
755 let deser = bincode::deserialize(&bin)
756 .context("deserialize access list")
757 .map_err(ReadError::ConversionError)?;
758 Ok(Some(deser))
759 }
760
761 pub fn authorization_list(&self) -> Result<Option<Vec<Authorization>>, ReadError> {
763 let bin = self
764 .inner
765 .get_nullable::<BinaryArray, Data>(TransactionField::AuthorizationList.as_ref())?;
766 let Some(bin) = bin else {
767 return Ok(None);
768 };
769 let deser = bincode::deserialize(&bin)
770 .context("deserialize authorization list")
771 .map_err(ReadError::ConversionError)?;
772 Ok(Some(deser))
773 }
774
775 pub fn l1_fee(&self) -> Result<Option<Quantity>, ReadError> {
780 self.inner
781 .get_nullable::<BinaryArray, Quantity>(TransactionField::L1Fee.as_ref())
782 }
783
784 pub fn max_fee_per_blob_gas(&self) -> Result<Option<Quantity>, ReadError> {
786 self.inner
787 .get_nullable::<BinaryArray, Quantity>(TransactionField::MaxFeePerBlobGas.as_ref())
788 }
789
790 pub fn blob_versioned_hashes(&self) -> Result<Option<Vec<Hash>>, ReadError> {
792 let bin = self
793 .inner
794 .get_nullable::<BinaryArray, Data>(TransactionField::BlobVersionedHashes.as_ref())?;
795 let Some(bin) = bin else {
796 return Ok(None);
797 };
798 let mut hashes = Vec::new();
799 for hash_bytes in bin.chunks(32) {
800 let hash = Hash::try_from(hash_bytes)
801 .context("convert blob versioned hash bytes to hash")
802 .map_err(ReadError::ConversionError)?;
803 hashes.push(hash);
804 }
805 Ok(Some(hashes))
806 }
807
808 pub fn l1_gas_price(&self) -> Result<Option<Quantity>, ReadError> {
810 self.inner
811 .get_nullable::<BinaryArray, Quantity>(TransactionField::L1GasPrice.as_ref())
812 }
813
814 pub fn l1_gas_used(&self) -> Result<Option<Quantity>, ReadError> {
816 self.inner
817 .get_nullable::<BinaryArray, Quantity>(TransactionField::L1GasUsed.as_ref())
818 }
819
820 pub fn l1_fee_scalar(&self) -> Result<Option<f64>, ReadError> {
822 let scalar = self
823 .inner
824 .get_nullable::<BinaryArray, Quantity>(TransactionField::L1FeeScalar.as_ref())?;
825 let Some(scalar_utf8) = scalar else {
826 return Ok(None);
827 };
828 let scalar_str = std::str::from_utf8(&scalar_utf8)
830 .context("convert l1 fee scalar to string")
831 .map_err(ReadError::ConversionError)?;
832
833 let scalar_f64: f64 = scalar_str
834 .parse()
835 .context("parse l1 fee scalar as f64")
836 .map_err(ReadError::ConversionError)?;
837 Ok(Some(scalar_f64))
838 }
839
840 pub fn gas_used_for_l1(&self) -> Result<Option<Quantity>, ReadError> {
842 self.inner
843 .get_nullable::<BinaryArray, Quantity>(TransactionField::GasUsedForL1.as_ref())
844 }
845
846 pub fn blob_gas_price(&self) -> Result<Option<Quantity>, ReadError> {
848 self.inner
849 .get_nullable::<BinaryArray, Quantity>(TransactionField::BlobGasPrice.as_ref())
850 }
851
852 pub fn blob_gas_used(&self) -> Result<Option<Quantity>, ReadError> {
854 self.inner
855 .get_nullable::<BinaryArray, Quantity>(TransactionField::BlobGasUsed.as_ref())
856 }
857
858 pub fn deposit_nonce(&self) -> Result<Option<Quantity>, ReadError> {
860 self.inner
861 .get_nullable::<BinaryArray, Quantity>(TransactionField::DepositNonce.as_ref())
862 }
863
864 pub fn deposit_receipt_version(&self) -> Result<Option<Quantity>, ReadError> {
866 self.inner
867 .get_nullable::<BinaryArray, Quantity>(TransactionField::DepositReceiptVersion.as_ref())
868 }
869
870 pub fn l1_base_fee_scalar(&self) -> Result<Option<Quantity>, ReadError> {
872 self.inner
873 .get_nullable::<BinaryArray, Quantity>(TransactionField::L1BaseFeeScalar.as_ref())
874 }
875
876 pub fn l1_blob_base_fee(&self) -> Result<Option<Quantity>, ReadError> {
878 self.inner
879 .get_nullable::<BinaryArray, Quantity>(TransactionField::L1BlobBaseFee.as_ref())
880 }
881
882 pub fn l1_blob_base_fee_scalar(&self) -> Result<Option<Quantity>, ReadError> {
884 self.inner
885 .get_nullable::<BinaryArray, Quantity>(TransactionField::L1BlobBaseFeeScalar.as_ref())
886 }
887
888 pub fn l1_block_number(&self) -> Result<Option<Quantity>, ReadError> {
890 self.inner
891 .get_nullable::<BinaryArray, _>(TransactionField::L1BlockNumber.as_ref())
892 }
893
894 pub fn mint(&self) -> Result<Option<Quantity>, ReadError> {
896 self.inner
897 .get_nullable::<BinaryArray, Quantity>(TransactionField::Mint.as_ref())
898 }
899
900 pub fn source_hash(&self) -> Result<Option<Hash>, ReadError> {
902 self.inner
903 .get_nullable::<BinaryArray, Hash>(TransactionField::SourceHash.as_ref())
904 }
905}
906
907pub struct TraceReader<'a> {
912 inner: ArrowRowReader<'a>,
913}
914
915impl<'a> From<ArrowRowReader<'a>> for TraceReader<'a> {
916 fn from(inner: ArrowRowReader<'a>) -> Self {
917 Self { inner }
918 }
919}
920
921pub type TraceIterator<'a> = ArrowRowIterator<'a, TraceReader<'a>>;
923
924impl<'a> TraceReader<'a> {
925 pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
928 let inner = ArrowRowReader::new(batch, row_idx)?;
929 Ok(Self { inner })
930 }
931
932 pub fn iter(batch: &'a RecordBatch) -> TraceIterator<'a> {
934 TraceIterator::new(batch)
935 }
936
937 pub fn block_hash(&self) -> Result<Hash, ReadError> {
939 self.inner
940 .get::<BinaryArray, Hash>(TraceField::BlockHash.as_ref())
941 }
942
943 pub fn block_number(&self) -> Result<u64, ReadError> {
945 self.inner
946 .get::<UInt64Array, _>(TraceField::BlockNumber.as_ref())
947 }
948
949 pub fn from(&self) -> Result<Option<Address>, ReadError> {
951 self.inner
952 .get_nullable::<BinaryArray, Address>(TraceField::From.as_ref())
953 }
954
955 pub fn to(&self) -> Result<Option<Address>, ReadError> {
957 self.inner
958 .get_nullable::<BinaryArray, Address>(TraceField::To.as_ref())
959 }
960
961 pub fn call_type(&self) -> Result<Option<String>, ReadError> {
963 self.inner
964 .get_nullable::<StringArray, String>(TraceField::CallType.as_ref())
965 }
966
967 pub fn gas(&self) -> Result<Option<Quantity>, ReadError> {
969 self.inner
970 .get_nullable::<BinaryArray, Quantity>(TraceField::Gas.as_ref())
971 }
972
973 pub fn input(&self) -> Result<Option<Data>, ReadError> {
975 self.inner
976 .get_nullable::<BinaryArray, Data>(TraceField::Input.as_ref())
977 }
978
979 pub fn init(&self) -> Result<Option<Data>, ReadError> {
981 self.inner
982 .get_nullable::<BinaryArray, Data>(TraceField::Init.as_ref())
983 }
984
985 pub fn value(&self) -> Result<Option<Quantity>, ReadError> {
987 self.inner
988 .get_nullable::<BinaryArray, Quantity>(TraceField::Value.as_ref())
989 }
990
991 pub fn author(&self) -> Result<Option<Address>, ReadError> {
993 self.inner
994 .get_nullable::<BinaryArray, Address>(TraceField::Author.as_ref())
995 }
996
997 pub fn reward_type(&self) -> Result<Option<String>, ReadError> {
999 self.inner
1000 .get_nullable::<StringArray, String>(TraceField::RewardType.as_ref())
1001 }
1002
1003 pub fn address(&self) -> Result<Option<Address>, ReadError> {
1005 self.inner
1006 .get_nullable::<BinaryArray, Address>(TraceField::Address.as_ref())
1007 }
1008
1009 pub fn code(&self) -> Result<Option<Data>, ReadError> {
1011 self.inner
1012 .get_nullable::<BinaryArray, Data>(TraceField::Code.as_ref())
1013 }
1014
1015 pub fn gas_used(&self) -> Result<Option<Quantity>, ReadError> {
1017 self.inner
1018 .get_nullable::<BinaryArray, Quantity>(TraceField::GasUsed.as_ref())
1019 }
1020
1021 pub fn output(&self) -> Result<Option<Data>, ReadError> {
1023 self.inner
1024 .get_nullable::<BinaryArray, Data>(TraceField::Output.as_ref())
1025 }
1026
1027 pub fn subtraces(&self) -> Result<Option<u64>, ReadError> {
1029 self.inner
1030 .get_nullable::<UInt64Array, u64>(TraceField::Subtraces.as_ref())
1031 }
1032
1033 pub fn trace_address(&self) -> Result<Option<Vec<u64>>, ReadError> {
1035 let bin = self
1036 .inner
1037 .get_nullable::<BinaryArray, Data>(TraceField::TraceAddress.as_ref())?;
1038 let Some(bin) = bin else {
1039 return Ok(None);
1040 };
1041 let deser = bincode::deserialize(&bin)
1042 .context("deserialize trace address")
1043 .map_err(ReadError::ConversionError)?;
1044 Ok(Some(deser))
1045 }
1046
1047 pub fn transaction_hash(&self) -> Result<Option<Hash>, ReadError> {
1049 self.inner
1050 .get_nullable::<BinaryArray, Hash>(TraceField::TransactionHash.as_ref())
1051 }
1052
1053 pub fn transaction_position(&self) -> Result<Option<u64>, ReadError> {
1055 self.inner
1056 .get_nullable::<UInt64Array, u64>(TraceField::TransactionPosition.as_ref())
1057 }
1058
1059 pub fn type_(&self) -> Result<Option<String>, ReadError> {
1061 self.inner
1062 .get_nullable::<StringArray, String>(TraceField::Type.as_ref())
1063 }
1064
1065 pub fn error(&self) -> Result<Option<String>, ReadError> {
1067 self.inner
1068 .get_nullable::<StringArray, String>(TraceField::Error.as_ref())
1069 }
1070
1071 pub fn sighash(&self) -> Result<Option<Data>, ReadError> {
1073 self.inner
1074 .get_nullable::<BinaryArray, _>(TraceField::Sighash.as_ref())
1075 }
1076
1077 pub fn action_address(&self) -> Result<Option<Address>, ReadError> {
1079 self.inner
1080 .get_nullable::<BinaryArray, Address>(TraceField::ActionAddress.as_ref())
1081 }
1082
1083 pub fn balance(&self) -> Result<Option<Quantity>, ReadError> {
1085 self.inner
1086 .get_nullable::<BinaryArray, Quantity>(TraceField::Balance.as_ref())
1087 }
1088
1089 pub fn refund_address(&self) -> Result<Option<Address>, ReadError> {
1091 self.inner
1092 .get_nullable::<BinaryArray, Address>(TraceField::RefundAddress.as_ref())
1093 }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098 use super::*;
1099 use anyhow::Context;
1100
1101 trait NotOption {}
1102
1103 impl NotOption for hypersync_format::Quantity {}
1104 impl NotOption for hypersync_format::Data {}
1105 impl NotOption for hypersync_format::UInt {}
1106 impl<const N: usize> NotOption for hypersync_format::FixedSizeData<N> {}
1107 impl NotOption for u64 {}
1108
1109 #[test]
1111 fn test_nullability_matches_schema() {
1112 fn assert_nullable<'a, T, F>(_: F, log_field: LogField)
1113 where
1114 F: FnOnce(&LogReader<'a>) -> Result<Option<T>, ReadError>,
1115 {
1116 assert!(log_field.is_nullable(), "Optional type should be nullable");
1117 }
1118
1119 fn assert_not_nullable<'a, T, F>(_: F, log_field: LogField)
1120 where
1121 F: FnOnce(&LogReader<'a>) -> Result<T, ReadError>,
1122 T: NotOption,
1124 {
1125 assert!(!log_field.is_nullable(), "should not be nullable");
1126 }
1127 for field in LogField::all() {
1130 match field {
1131 LogField::Removed => assert_nullable(LogReader::removed, field),
1132 LogField::Topic0 => assert_nullable(LogReader::topic0, field),
1133 LogField::Topic1 => assert_nullable(LogReader::topic1, field),
1134 LogField::Topic2 => assert_nullable(LogReader::topic2, field),
1135 LogField::Topic3 => assert_nullable(LogReader::topic3, field),
1136 LogField::LogIndex => assert_not_nullable(LogReader::log_index, field),
1137 LogField::TransactionIndex => {
1138 assert_not_nullable(LogReader::transaction_index, field)
1139 }
1140 LogField::TransactionHash => {
1141 assert_not_nullable(LogReader::transaction_hash, field)
1142 }
1143 LogField::BlockHash => assert_not_nullable(LogReader::block_hash, field),
1144 LogField::BlockNumber => assert_not_nullable(LogReader::block_number, field),
1145 LogField::Address => assert_not_nullable(LogReader::address, field),
1146 LogField::Data => assert_not_nullable(LogReader::data, field),
1147 }
1148 }
1149 }
1150
1151 #[test]
1152 fn test_block_nullability_matches_schema() {
1153 fn assert_nullable<'a, T, F>(_: F, block_field: BlockField)
1154 where
1155 F: FnOnce(&BlockReader<'a>) -> Result<Option<T>, ReadError>,
1156 {
1157 assert!(
1158 block_field.is_nullable(),
1159 "Optional type should be nullable"
1160 );
1161 }
1162
1163 fn assert_not_nullable<'a, T, F>(_: F, block_field: BlockField)
1164 where
1165 F: FnOnce(&BlockReader<'a>) -> Result<T, ReadError>,
1166 T: NotOption,
1167 {
1168 assert!(!block_field.is_nullable(), "should not be nullable");
1169 }
1170
1171 for field in BlockField::all() {
1172 match field {
1173 BlockField::Nonce => assert_nullable(BlockReader::nonce, field),
1175 BlockField::Difficulty => assert_nullable(BlockReader::difficulty, field),
1176 BlockField::TotalDifficulty => {
1177 assert_nullable(BlockReader::total_difficulty, field)
1178 }
1179 BlockField::Uncles => assert_nullable(BlockReader::uncles, field),
1180 BlockField::BaseFeePerGas => assert_nullable(BlockReader::base_fee_per_gas, field),
1181 BlockField::BlobGasUsed => assert_nullable(BlockReader::blob_gas_used, field),
1182 BlockField::ExcessBlobGas => assert_nullable(BlockReader::excess_blob_gas, field),
1183 BlockField::ParentBeaconBlockRoot => {
1184 assert_nullable(BlockReader::parent_beacon_block_root, field)
1185 }
1186 BlockField::WithdrawalsRoot => {
1187 assert_nullable(BlockReader::withdrawals_root, field)
1188 }
1189 BlockField::Withdrawals => assert_nullable(BlockReader::withdrawals, field),
1190 BlockField::L1BlockNumber => assert_nullable(BlockReader::l1_block_number, field),
1191 BlockField::SendCount => assert_nullable(BlockReader::send_count, field),
1192 BlockField::SendRoot => assert_nullable(BlockReader::send_root, field),
1193 BlockField::MixHash => assert_nullable(BlockReader::mix_hash, field),
1194 BlockField::Number => assert_not_nullable(BlockReader::number, field),
1196 BlockField::Hash => assert_not_nullable(BlockReader::hash, field),
1197 BlockField::ParentHash => assert_not_nullable(BlockReader::parent_hash, field),
1198 BlockField::Sha3Uncles => assert_not_nullable(BlockReader::sha3_uncles, field),
1199 BlockField::LogsBloom => assert_not_nullable(BlockReader::logs_bloom, field),
1200 BlockField::TransactionsRoot => {
1201 assert_not_nullable(BlockReader::transactions_root, field)
1202 }
1203 BlockField::StateRoot => assert_not_nullable(BlockReader::state_root, field),
1204 BlockField::ReceiptsRoot => assert_not_nullable(BlockReader::receipts_root, field),
1205 BlockField::Miner => assert_not_nullable(BlockReader::miner, field),
1206 BlockField::ExtraData => assert_not_nullable(BlockReader::extra_data, field),
1207 BlockField::Size => assert_not_nullable(BlockReader::size, field),
1208 BlockField::GasLimit => assert_not_nullable(BlockReader::gas_limit, field),
1209 BlockField::GasUsed => assert_not_nullable(BlockReader::gas_used, field),
1210 BlockField::Timestamp => assert_not_nullable(BlockReader::timestamp, field),
1211 }
1212 }
1213 }
1214
1215 #[test]
1216 fn test_transaction_nullability_matches_schema() {
1217 fn assert_nullable<'a, T, F>(_: F, transaction_field: TransactionField)
1218 where
1219 F: FnOnce(&TransactionReader<'a>) -> Result<Option<T>, ReadError>,
1220 {
1221 assert!(
1222 transaction_field.is_nullable(),
1223 "Optional type should be nullable"
1224 );
1225 }
1226
1227 fn assert_not_nullable<'a, T, F>(_: F, transaction_field: TransactionField)
1228 where
1229 F: FnOnce(&TransactionReader<'a>) -> Result<T, ReadError>,
1230 T: NotOption,
1231 {
1232 assert!(!transaction_field.is_nullable(), "should not be nullable");
1233 }
1234
1235 for field in TransactionField::all() {
1236 match field {
1237 TransactionField::From => assert_nullable(TransactionReader::from, field),
1238 TransactionField::GasPrice => assert_nullable(TransactionReader::gas_price, field),
1239 TransactionField::To => assert_nullable(TransactionReader::to, field),
1240 TransactionField::V => assert_nullable(TransactionReader::v, field),
1241 TransactionField::R => assert_nullable(TransactionReader::r, field),
1242 TransactionField::S => assert_nullable(TransactionReader::s, field),
1243 TransactionField::MaxPriorityFeePerGas => {
1244 assert_nullable(TransactionReader::max_priority_fee_per_gas, field)
1245 }
1246 TransactionField::MaxFeePerGas => {
1247 assert_nullable(TransactionReader::max_fee_per_gas, field)
1248 }
1249 TransactionField::ChainId => assert_nullable(TransactionReader::chain_id, field),
1250 TransactionField::ContractAddress => {
1251 assert_nullable(TransactionReader::contract_address, field)
1252 }
1253 TransactionField::Type => assert_nullable(TransactionReader::type_, field),
1254 TransactionField::Root => assert_nullable(TransactionReader::root, field),
1255 TransactionField::Status => assert_nullable(TransactionReader::status, field),
1256 TransactionField::Sighash => assert_nullable(TransactionReader::sighash, field),
1257 TransactionField::YParity => assert_nullable(TransactionReader::y_parity, field),
1258 TransactionField::AccessList => {
1259 assert_nullable(TransactionReader::access_list, field)
1260 }
1261 TransactionField::AuthorizationList => {
1262 assert_nullable(TransactionReader::authorization_list, field)
1263 }
1264 TransactionField::L1Fee => assert_nullable(TransactionReader::l1_fee, field),
1265 TransactionField::MaxFeePerBlobGas => {
1266 assert_nullable(TransactionReader::max_fee_per_blob_gas, field)
1267 }
1268 TransactionField::BlobVersionedHashes => {
1269 assert_nullable(TransactionReader::blob_versioned_hashes, field)
1270 }
1271 TransactionField::BlockHash => {
1272 assert_not_nullable(TransactionReader::block_hash, field)
1273 }
1274 TransactionField::BlockNumber => {
1275 assert_not_nullable(TransactionReader::block_number, field)
1276 }
1277 TransactionField::Gas => assert_not_nullable(TransactionReader::gas, field),
1278 TransactionField::Hash => assert_not_nullable(TransactionReader::hash, field),
1279 TransactionField::Input => assert_not_nullable(TransactionReader::input, field),
1280 TransactionField::Nonce => assert_not_nullable(TransactionReader::nonce, field),
1281 TransactionField::TransactionIndex => {
1282 assert_not_nullable(TransactionReader::transaction_index, field)
1283 }
1284 TransactionField::Value => assert_not_nullable(TransactionReader::value, field),
1285 TransactionField::CumulativeGasUsed => {
1286 assert_not_nullable(TransactionReader::cumulative_gas_used, field)
1287 }
1288 TransactionField::EffectiveGasPrice => {
1289 assert_not_nullable(TransactionReader::effective_gas_price, field)
1290 }
1291 TransactionField::GasUsed => {
1292 assert_not_nullable(TransactionReader::gas_used, field)
1293 }
1294 TransactionField::LogsBloom => {
1295 assert_not_nullable(TransactionReader::logs_bloom, field)
1296 }
1297 TransactionField::L1GasPrice => {
1298 assert_nullable(TransactionReader::l1_gas_price, field)
1299 }
1300 TransactionField::L1GasUsed => {
1301 assert_nullable(TransactionReader::l1_gas_used, field)
1302 }
1303 TransactionField::L1FeeScalar => {
1304 assert_nullable(TransactionReader::l1_fee_scalar, field)
1305 }
1306 TransactionField::GasUsedForL1 => {
1307 assert_nullable(TransactionReader::gas_used_for_l1, field)
1308 }
1309 TransactionField::BlobGasPrice => {
1310 assert_nullable(TransactionReader::blob_gas_price, field)
1311 }
1312 TransactionField::BlobGasUsed => {
1313 assert_nullable(TransactionReader::blob_gas_used, field)
1314 }
1315 TransactionField::DepositNonce => {
1316 assert_nullable(TransactionReader::deposit_nonce, field)
1317 }
1318 TransactionField::DepositReceiptVersion => {
1319 assert_nullable(TransactionReader::deposit_receipt_version, field)
1320 }
1321 TransactionField::L1BaseFeeScalar => {
1322 assert_nullable(TransactionReader::l1_base_fee_scalar, field)
1323 }
1324 TransactionField::L1BlobBaseFee => {
1325 assert_nullable(TransactionReader::l1_blob_base_fee, field)
1326 }
1327 TransactionField::L1BlobBaseFeeScalar => {
1328 assert_nullable(TransactionReader::l1_blob_base_fee_scalar, field)
1329 }
1330
1331 TransactionField::L1BlockNumber => {
1332 assert_nullable(TransactionReader::l1_block_number, field)
1333 }
1334 TransactionField::Mint => assert_nullable(TransactionReader::mint, field),
1335 TransactionField::SourceHash => {
1336 assert_nullable(TransactionReader::source_hash, field)
1337 }
1338 }
1339 }
1340 }
1341
1342 #[test]
1343 fn test_trace_nullability_matches_schema() {
1344 fn assert_nullable<'a, T, F>(_: F, trace_field: TraceField)
1345 where
1346 F: FnOnce(&TraceReader<'a>) -> Result<Option<T>, ReadError>,
1347 {
1348 assert!(
1349 trace_field.is_nullable(),
1350 "Optional type should be nullable"
1351 );
1352 }
1353
1354 fn assert_not_nullable<'a, T, F>(_: F, trace_field: TraceField)
1355 where
1356 F: FnOnce(&TraceReader<'a>) -> Result<T, ReadError>,
1357 T: NotOption,
1358 {
1359 assert!(!trace_field.is_nullable(), "should not be nullable");
1360 }
1361
1362 for field in TraceField::all() {
1363 match field {
1364 TraceField::TransactionHash => {
1366 assert_nullable(TraceReader::transaction_hash, field)
1367 }
1368 TraceField::TransactionPosition => {
1369 assert_nullable(TraceReader::transaction_position, field)
1370 }
1371 TraceField::Type => assert_nullable(TraceReader::type_, field),
1372 TraceField::Error => assert_nullable(TraceReader::error, field),
1373 TraceField::From => assert_nullable(TraceReader::from, field),
1374 TraceField::To => assert_nullable(TraceReader::to, field),
1375 TraceField::Author => assert_nullable(TraceReader::author, field),
1376 TraceField::Gas => assert_nullable(TraceReader::gas, field),
1377 TraceField::GasUsed => assert_nullable(TraceReader::gas_used, field),
1378 TraceField::ActionAddress => assert_nullable(TraceReader::action_address, field),
1379 TraceField::Address => assert_nullable(TraceReader::address, field),
1380 TraceField::Balance => assert_nullable(TraceReader::balance, field),
1381 TraceField::CallType => assert_nullable(TraceReader::call_type, field),
1382 TraceField::Code => assert_nullable(TraceReader::code, field),
1383 TraceField::Init => assert_nullable(TraceReader::init, field),
1384 TraceField::Input => assert_nullable(TraceReader::input, field),
1385 TraceField::Output => assert_nullable(TraceReader::output, field),
1386 TraceField::RefundAddress => assert_nullable(TraceReader::refund_address, field),
1387 TraceField::RewardType => assert_nullable(TraceReader::reward_type, field),
1388 TraceField::Sighash => assert_nullable(TraceReader::sighash, field),
1389 TraceField::Subtraces => assert_nullable(TraceReader::subtraces, field),
1390 TraceField::TraceAddress => assert_nullable(TraceReader::trace_address, field),
1391 TraceField::Value => assert_nullable(TraceReader::value, field),
1392 TraceField::BlockHash => assert_not_nullable(TraceReader::block_hash, field),
1394 TraceField::BlockNumber => assert_not_nullable(TraceReader::block_number, field),
1395 }
1396 }
1397 }
1398
1399 fn assert_ok<T, E: std::fmt::Debug>(result: Result<T, E>) {
1400 let _ = result.expect("should be ok");
1401 }
1402
1403 #[tokio::test(flavor = "multi_thread")]
1404 #[ignore = "integration test for returned schema"]
1405 async fn test_readers_integration() -> anyhow::Result<()> {
1406 use crate::{
1407 net_types::{LogField, LogFilter, TransactionField, TransactionFilter},
1408 Client, Query,
1409 };
1410 let client = Client::builder()
1411 .url("https://eth-traces.hypersync.xyz")
1412 .api_token(dotenvy::var("HYPERSYNC_API_TOKEN")?)
1413 .build()
1414 .context("Failed to build client")?;
1415
1416 let query = Query::new()
1417 .from_block(20_000_000)
1418 .to_block_excl(20_000_001)
1419 .include_all_blocks()
1420 .where_logs(LogFilter::all())
1421 .where_transactions(TransactionFilter::all())
1422 .select_log_fields(LogField::all())
1423 .select_block_fields(BlockField::all())
1424 .select_transaction_fields(TransactionField::all())
1425 .select_trace_fields(TraceField::all());
1426
1427 let res = client.collect_arrow(query, Default::default()).await?;
1428
1429 let mut num_logs = 0;
1430
1431 for batch in res.data.logs {
1432 for log_reader in LogReader::iter(&batch) {
1433 num_logs += 1;
1434
1435 for log_field in LogField::all() {
1436 match log_field {
1437 LogField::Removed => assert_ok(log_reader.removed()),
1438 LogField::LogIndex => assert_ok(log_reader.log_index()),
1439 LogField::TransactionIndex => assert_ok(log_reader.transaction_index()),
1440 LogField::TransactionHash => assert_ok(log_reader.transaction_hash()),
1441 LogField::BlockHash => assert_ok(log_reader.block_hash()),
1442 LogField::BlockNumber => assert_ok(log_reader.block_number()),
1443 LogField::Address => assert_ok(log_reader.address()),
1444 LogField::Data => assert_ok(log_reader.data()),
1445 LogField::Topic0 => assert_ok(log_reader.topic0()),
1446 LogField::Topic1 => assert_ok(log_reader.topic1()),
1447 LogField::Topic2 => assert_ok(log_reader.topic2()),
1448 LogField::Topic3 => assert_ok(log_reader.topic3()),
1449 }
1450 }
1451 }
1452 }
1453
1454 println!("num_logs: {}", num_logs);
1455
1456 let mut num_transactions = 0;
1457
1458 for batch in res.data.transactions {
1459 for transaction_reader in TransactionReader::iter(&batch) {
1460 num_transactions += 1;
1461
1462 for transaction_field in TransactionField::all() {
1463 match transaction_field {
1464 TransactionField::BlockHash => assert_ok(transaction_reader.block_hash()),
1465 TransactionField::BlockNumber => {
1466 assert_ok(transaction_reader.block_number())
1467 }
1468 TransactionField::From => assert_ok(transaction_reader.from()),
1469 TransactionField::Gas => assert_ok(transaction_reader.gas()),
1470 TransactionField::GasPrice => assert_ok(transaction_reader.gas_price()),
1471 TransactionField::Hash => assert_ok(transaction_reader.hash()),
1472 TransactionField::Input => assert_ok(transaction_reader.input()),
1473 TransactionField::Nonce => assert_ok(transaction_reader.nonce()),
1474 TransactionField::To => assert_ok(transaction_reader.to()),
1475 TransactionField::TransactionIndex => {
1476 assert_ok(transaction_reader.transaction_index())
1477 }
1478 TransactionField::Value => assert_ok(transaction_reader.value()),
1479 TransactionField::V => assert_ok(transaction_reader.v()),
1480 TransactionField::R => assert_ok(transaction_reader.r()),
1481 TransactionField::S => assert_ok(transaction_reader.s()),
1482 TransactionField::MaxPriorityFeePerGas => {
1483 assert_ok(transaction_reader.max_priority_fee_per_gas())
1484 }
1485 TransactionField::MaxFeePerGas => {
1486 assert_ok(transaction_reader.max_fee_per_gas())
1487 }
1488 TransactionField::ChainId => assert_ok(transaction_reader.chain_id()),
1489 TransactionField::CumulativeGasUsed => {
1490 assert_ok(transaction_reader.cumulative_gas_used())
1491 }
1492 TransactionField::EffectiveGasPrice => {
1493 assert_ok(transaction_reader.effective_gas_price())
1494 }
1495 TransactionField::GasUsed => assert_ok(transaction_reader.gas_used()),
1496 TransactionField::ContractAddress => {
1497 assert_ok(transaction_reader.contract_address())
1498 }
1499 TransactionField::LogsBloom => assert_ok(transaction_reader.logs_bloom()),
1500 TransactionField::Type => assert_ok(transaction_reader.type_()),
1501 TransactionField::Root => assert_ok(transaction_reader.root()),
1502 TransactionField::Status => assert_ok(transaction_reader.status()),
1503 TransactionField::Sighash => assert_ok(transaction_reader.sighash()),
1504 TransactionField::YParity => assert_ok(transaction_reader.y_parity()),
1505 TransactionField::AccessList => assert_ok(transaction_reader.access_list()),
1506 TransactionField::AuthorizationList => {
1507 assert_ok(transaction_reader.authorization_list())
1508 }
1509 TransactionField::L1Fee => assert_ok(transaction_reader.l1_fee()),
1510 TransactionField::MaxFeePerBlobGas => {
1511 assert_ok(transaction_reader.max_fee_per_blob_gas())
1512 }
1513 TransactionField::BlobVersionedHashes => {
1514 assert_ok(transaction_reader.blob_versioned_hashes())
1515 }
1516 TransactionField::L1GasPrice => {
1517 assert_ok(transaction_reader.l1_gas_price())
1518 }
1519 TransactionField::L1GasUsed => assert_ok(transaction_reader.l1_gas_used()),
1520 TransactionField::L1FeeScalar => {
1521 assert_ok(transaction_reader.l1_fee_scalar())
1522 }
1523 TransactionField::GasUsedForL1 => {
1524 assert_ok(transaction_reader.gas_used_for_l1())
1525 }
1526 TransactionField::BlobGasPrice => {
1527 assert_ok(transaction_reader.blob_gas_price())
1528 }
1529 TransactionField::BlobGasUsed => {
1530 assert_ok(transaction_reader.blob_gas_used())
1531 }
1532 TransactionField::DepositNonce => {
1533 assert_ok(transaction_reader.deposit_nonce())
1534 }
1535 TransactionField::DepositReceiptVersion => {
1536 assert_ok(transaction_reader.deposit_receipt_version())
1537 }
1538 TransactionField::L1BaseFeeScalar => {
1539 assert_ok(transaction_reader.l1_base_fee_scalar())
1540 }
1541 TransactionField::L1BlobBaseFee => {
1542 assert_ok(transaction_reader.l1_blob_base_fee())
1543 }
1544 TransactionField::L1BlobBaseFeeScalar => {
1545 assert_ok(transaction_reader.l1_blob_base_fee_scalar())
1546 }
1547 TransactionField::L1BlockNumber => {
1548 assert_ok(transaction_reader.l1_block_number())
1549 }
1550 TransactionField::Mint => assert_ok(transaction_reader.mint()),
1551 TransactionField::SourceHash => assert_ok(transaction_reader.source_hash()),
1552 }
1553 }
1554 }
1555 }
1556
1557 println!("num_transactions: {}", num_transactions);
1558 let mut num_blocks = 0;
1559
1560 for batch in res.data.blocks {
1561 for block_reader in BlockReader::iter(&batch) {
1562 num_blocks += 1;
1563
1564 for block_field in BlockField::all() {
1565 match block_field {
1566 BlockField::Number => assert_ok(block_reader.number()),
1567 BlockField::Hash => assert_ok(block_reader.hash()),
1568 BlockField::ParentHash => assert_ok(block_reader.parent_hash()),
1569 BlockField::Nonce => assert_ok(block_reader.nonce()),
1570 BlockField::Sha3Uncles => assert_ok(block_reader.sha3_uncles()),
1571 BlockField::LogsBloom => assert_ok(block_reader.logs_bloom()),
1572 BlockField::TransactionsRoot => assert_ok(block_reader.transactions_root()),
1573 BlockField::StateRoot => assert_ok(block_reader.state_root()),
1574 BlockField::ReceiptsRoot => assert_ok(block_reader.receipts_root()),
1575 BlockField::Miner => assert_ok(block_reader.miner()),
1576 BlockField::Difficulty => assert_ok(block_reader.difficulty()),
1577 BlockField::TotalDifficulty => assert_ok(block_reader.total_difficulty()),
1578 BlockField::ExtraData => assert_ok(block_reader.extra_data()),
1579 BlockField::Size => assert_ok(block_reader.size()),
1580 BlockField::GasLimit => assert_ok(block_reader.gas_limit()),
1581 BlockField::GasUsed => assert_ok(block_reader.gas_used()),
1582 BlockField::Timestamp => assert_ok(block_reader.timestamp()),
1583 BlockField::Uncles => assert_ok(block_reader.uncles()),
1584 BlockField::BaseFeePerGas => assert_ok(block_reader.base_fee_per_gas()),
1585 BlockField::BlobGasUsed => assert_ok(block_reader.blob_gas_used()),
1586 BlockField::ExcessBlobGas => assert_ok(block_reader.excess_blob_gas()),
1587 BlockField::ParentBeaconBlockRoot => {
1588 assert_ok(block_reader.parent_beacon_block_root())
1589 }
1590 BlockField::WithdrawalsRoot => assert_ok(block_reader.withdrawals_root()),
1591 BlockField::Withdrawals => assert_ok(block_reader.withdrawals()),
1592 BlockField::L1BlockNumber => assert_ok(block_reader.l1_block_number()),
1593 BlockField::SendCount => assert_ok(block_reader.send_count()),
1594 BlockField::SendRoot => assert_ok(block_reader.send_root()),
1595 BlockField::MixHash => assert_ok(block_reader.mix_hash()),
1596 }
1597 }
1598 }
1599 }
1600
1601 println!("num_blocks: {}", num_blocks);
1602
1603 let mut num_traces = 0;
1604
1605 for batch in res.data.traces {
1606 for trace_reader in TraceReader::iter(&batch) {
1607 num_traces += 1;
1608
1609 for trace_field in TraceField::all() {
1610 match trace_field {
1611 TraceField::BlockHash => assert_ok(trace_reader.block_hash()),
1612 TraceField::BlockNumber => assert_ok(trace_reader.block_number()),
1613 TraceField::From => assert_ok(trace_reader.from()),
1614 TraceField::To => assert_ok(trace_reader.to()),
1615 TraceField::CallType => assert_ok(trace_reader.call_type()),
1616 TraceField::Gas => assert_ok(trace_reader.gas()),
1617 TraceField::Input => assert_ok(trace_reader.input()),
1618 TraceField::Init => assert_ok(trace_reader.init()),
1619 TraceField::Value => assert_ok(trace_reader.value()),
1620 TraceField::Author => assert_ok(trace_reader.author()),
1621 TraceField::RewardType => assert_ok(trace_reader.reward_type()),
1622 TraceField::Address => assert_ok(trace_reader.address()),
1623 TraceField::Code => assert_ok(trace_reader.code()),
1624 TraceField::GasUsed => assert_ok(trace_reader.gas_used()),
1625 TraceField::Output => assert_ok(trace_reader.output()),
1626 TraceField::Subtraces => assert_ok(trace_reader.subtraces()),
1627 TraceField::TraceAddress => assert_ok(trace_reader.trace_address()),
1628 TraceField::TransactionHash => assert_ok(trace_reader.transaction_hash()),
1629 TraceField::TransactionPosition => {
1630 assert_ok(trace_reader.transaction_position())
1631 }
1632 TraceField::Type => assert_ok(trace_reader.type_()),
1633 TraceField::Error => assert_ok(trace_reader.error()),
1634 TraceField::Sighash => assert_ok(trace_reader.sighash()),
1635 TraceField::ActionAddress => assert_ok(trace_reader.action_address()),
1636 TraceField::Balance => assert_ok(trace_reader.balance()),
1637 TraceField::RefundAddress => assert_ok(trace_reader.refund_address()),
1638 }
1639 }
1640 }
1641 }
1642
1643 println!("num_traces: {}", num_traces);
1644
1645 assert!(num_traces > 0, "no traces found");
1646 assert!(num_logs > 0, "no logs found");
1647 assert!(num_transactions > 0, "no transactions found");
1648 assert!(num_blocks > 0, "no blocks found");
1649
1650 Ok(())
1651 }
1652}