hypersync_client/
arrow_reader.rs

1//! Reader types for reading Arrow record batch data as native Rust types.
2//!
3//! This module provides zero-copy readers that access Arrow columnar data directly
4//! without copying or allocating new memory for individual field access.
5
6use anyhow::Context;
7use arrow::{
8    array::{
9        Array, ArrayAccessor, BinaryArray, BooleanArray, RecordBatch, StringArray, UInt64Array,
10        UInt8Array,
11    },
12    datatypes::DataType,
13};
14use hypersync_format::{
15    AccessList, Address, Authorization, BlockNumber, Data, FixedSizeData, Hash, LogIndex, Quantity,
16    TransactionIndex, TransactionStatus, TransactionType, Withdrawal,
17};
18use hypersync_net_types::{BlockField, LogField, TraceField, TransactionField};
19
20type ColResult<T> = std::result::Result<T, ColumnError>;
21
22/// Error that occurs when trying to access a column in Arrow data.
23#[derive(Debug, thiserror::Error)]
24#[error("column {col_name} {err}")]
25pub struct ColumnError {
26    /// The name of the column that caused the error.
27    pub col_name: &'static str,
28    /// The specific type of column error that occurred.
29    pub err: ColumnErrorType,
30}
31
32impl ColumnError {
33    fn not_found(col_name: &'static str) -> Self {
34        Self {
35            col_name,
36            err: ColumnErrorType::NotFound,
37        }
38    }
39
40    fn wrong_type(
41        col_name: &'static str,
42        expected_type: &'static str,
43        actual_type: DataType,
44    ) -> Self {
45        Self {
46            col_name,
47            err: ColumnErrorType::WrongType {
48                expected_type,
49                actual_type,
50            },
51        }
52    }
53}
54
55/// The specific type of error that can occur when accessing a column.
56#[derive(Debug, thiserror::Error)]
57pub enum ColumnErrorType {
58    /// The column was not found in the Arrow schema.
59    #[error("not found")]
60    NotFound,
61    /// The column exists but has a different type than expected.
62    #[error("expected to be of type {expected_type} but found {actual_type}")]
63    WrongType {
64        /// The expected Arrow data type.
65        expected_type: &'static str,
66        /// The actual Arrow data type found in the schema.
67        actual_type: DataType,
68    },
69}
70
71fn column_as<'a, T: 'static>(batch: &'a RecordBatch, col_name: &'static str) -> ColResult<&'a T> {
72    match batch.column_by_name(col_name) {
73        None => Err(ColumnError::not_found(col_name)),
74        Some(c) => {
75            let Some(val) = c.as_any().downcast_ref::<T>() else {
76                let expected_type = std::any::type_name::<T>();
77                let actual_type = c.data_type().clone();
78                return Err(ColumnError::wrong_type(
79                    col_name,
80                    expected_type,
81                    actual_type,
82                ));
83            };
84            Ok(val)
85        }
86    }
87}
88
89/// Error that can occur when reading data from Arrow columns.
90#[derive(Debug, thiserror::Error)]
91pub enum ReadError {
92    /// A value was expected to be non-null but was null.
93    #[error("value was unexpectedly null")]
94    UnexpectedNull,
95    /// An error occurred while accessing a column.
96    #[error(transparent)]
97    ColumnError(#[from] ColumnError),
98    /// An error occurred during data type conversion.
99    #[error("{0:?}")]
100    ConversionError(anyhow::Error),
101}
102
103/// A reader for accessing individual row data from an Arrow RecordBatch.
104///
105/// This struct provides zero-copy access to columnar data in Arrow format,
106/// allowing efficient reading of specific fields from a single row.
107pub struct ArrowRowReader<'a> {
108    batch: &'a RecordBatch,
109    row_idx: usize,
110}
111
112impl<'a> ArrowRowReader<'a> {
113    /// Safely create a new reader for the given batch at row index and check
114    /// that row_idx is within the bounds of the batch.
115    fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
116        let len = if let Some(first_column) = batch.columns().first() {
117            first_column.len()
118        } else {
119            0
120        };
121        if row_idx >= len {
122            anyhow::bail!("row index out of bounds");
123        }
124
125        Ok(Self { batch, row_idx })
126    }
127
128    /// Read and convert the value at col_name that could be null
129    fn get_nullable<Col, T>(&self, col_name: &'static str) -> Result<Option<T>, ReadError>
130    where
131        Col: 'static,
132        &'a Col: ArrayAccessor,
133
134        <&'a Col as ArrayAccessor>::Item: TryInto<T>,
135        <<&'a Col as ArrayAccessor>::Item as TryInto<T>>::Error:
136            std::error::Error + Send + Sync + 'static,
137    {
138        let arr = column_as::<Col>(self.batch, col_name)?;
139
140        if arr.is_valid(self.row_idx) {
141            let value = arr.value(self.row_idx);
142            let converted: T = value
143                .try_into()
144                .map_err(|e| ReadError::ConversionError(anyhow::Error::new(e)))?;
145            Ok(Some(converted))
146        } else {
147            Ok(None)
148        }
149    }
150
151    /// Read and convert the value at col_name where it should not be null
152    fn get<Col, T>(&self, col_name: &'static str) -> Result<T, ReadError>
153    where
154        Col: 'static,
155        &'a Col: ArrayAccessor,
156
157        <&'a Col as ArrayAccessor>::Item: TryInto<T>,
158        <<&'a Col as ArrayAccessor>::Item as TryInto<T>>::Error:
159            std::error::Error + Send + Sync + 'static,
160    {
161        match self.get_nullable::<Col, T>(col_name) {
162            Ok(Some(val)) => Ok(val),
163            Ok(None) => Err(ReadError::UnexpectedNull),
164            Err(e) => Err(e),
165        }
166    }
167}
168
169/// Iterator over log rows in an RecordBatch.
170pub struct ArrowRowIterator<'a, R> {
171    batch: &'a RecordBatch,
172    current_idx: usize,
173    len: usize,
174    phantom: std::marker::PhantomData<R>,
175}
176
177impl<'a, R: From<ArrowRowReader<'a>>> ArrowRowIterator<'a, R> {
178    /// Create a new iterator for the given batch.
179    pub fn new(batch: &'a RecordBatch) -> Self {
180        let len = if let Some(first_column) = batch.columns().first() {
181            first_column.len()
182        } else {
183            0
184        };
185        Self {
186            batch,
187            current_idx: 0,
188            len,
189            phantom: std::marker::PhantomData,
190        }
191    }
192}
193
194impl<'a, R: From<ArrowRowReader<'a>>> Iterator for ArrowRowIterator<'a, R> {
195    type Item = R;
196
197    fn next(&mut self) -> Option<Self::Item> {
198        if self.current_idx < self.len {
199            let reader = ArrowRowReader {
200                batch: self.batch,
201                row_idx: self.current_idx,
202            };
203            self.current_idx += 1;
204            Some(reader.into())
205        } else {
206            None
207        }
208    }
209
210    fn size_hint(&self) -> (usize, Option<usize>) {
211        let remaining = self.len - self.current_idx;
212        (remaining, Some(remaining))
213    }
214}
215
216impl<'a, R: From<ArrowRowReader<'a>>> ExactSizeIterator for ArrowRowIterator<'a, R> {
217    fn len(&self) -> usize {
218        self.len - self.current_idx
219    }
220}
221
222/// Reader for log data from Arrow batches.
223///
224/// Provides efficient access to log fields without copying data from the underlying
225/// Arrow columnar format. Each reader is bound to a specific row in the batch.
226pub struct LogReader<'a> {
227    inner: ArrowRowReader<'a>,
228}
229
230impl<'a> From<ArrowRowReader<'a>> for LogReader<'a> {
231    fn from(inner: ArrowRowReader<'a>) -> Self {
232        Self { inner }
233    }
234}
235
236/// Iterator over log rows in an RecordBatch.
237pub type LogIterator<'a> = ArrowRowIterator<'a, LogReader<'a>>;
238
239impl<'a> LogReader<'a> {
240    /// Safely create a new reader for the given batch at row index and check
241    /// that row_idx is within the bounds of the batch.
242    pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
243        let inner = ArrowRowReader::new(batch, row_idx)?;
244        Ok(Self { inner })
245    }
246    /// Create an iterator over all rows in the batch.
247    pub fn iter(batch: &'a RecordBatch) -> LogIterator<'a> {
248        LogIterator::new(batch)
249    }
250
251    /// The boolean value indicating if the event was removed from the blockchain due
252    /// to a chain reorganization. True if the log was removed. False if it is a valid log.
253    pub fn removed(&self) -> Result<Option<bool>, ReadError> {
254        self.inner
255            .get_nullable::<BooleanArray, bool>(LogField::Removed.as_ref())
256    }
257
258    /// The integer identifying the index of the event within the block's list of events.
259    pub fn log_index(&self) -> Result<LogIndex, ReadError> {
260        self.inner
261            .get::<UInt64Array, LogIndex>(LogField::LogIndex.as_ref())
262    }
263
264    /// The integer index of the transaction within the block's list of transactions.
265    pub fn transaction_index(&self) -> Result<TransactionIndex, ReadError> {
266        self.inner
267            .get::<UInt64Array, TransactionIndex>(LogField::TransactionIndex.as_ref())
268    }
269
270    /// The hash of the transaction that triggered the event.
271    pub fn transaction_hash(&self) -> Result<Hash, ReadError> {
272        self.inner
273            .get::<BinaryArray, Hash>(LogField::TransactionHash.as_ref())
274    }
275
276    /// The hash of the block in which the event was included.
277    pub fn block_hash(&self) -> Result<Hash, ReadError> {
278        self.inner
279            .get::<BinaryArray, Hash>(LogField::BlockHash.as_ref())
280    }
281
282    /// The block number in which the event was included.
283    pub fn block_number(&self) -> Result<BlockNumber, ReadError> {
284        self.inner
285            .get::<UInt64Array, BlockNumber>(LogField::BlockNumber.as_ref())
286    }
287
288    /// The contract address from which the event originated.
289    pub fn address(&self) -> Result<Address, ReadError> {
290        self.inner
291            .get::<BinaryArray, Address>(LogField::Address.as_ref())
292    }
293
294    /// The first topic of the event (topic0).
295    pub fn topic0(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
296        self.inner
297            .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic0.as_ref())
298    }
299
300    /// The second topic of the event (topic1).
301    pub fn topic1(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
302        self.inner
303            .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic1.as_ref())
304    }
305
306    /// The third topic of the event (topic2).
307    pub fn topic2(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
308        self.inner
309            .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic2.as_ref())
310    }
311
312    /// The fourth topic of the event (topic3).
313    pub fn topic3(&self) -> Result<Option<FixedSizeData<32>>, ReadError> {
314        self.inner
315            .get_nullable::<BinaryArray, FixedSizeData<32>>(LogField::Topic3.as_ref())
316    }
317
318    /// The non-indexed data that was emitted along with the event.
319    pub fn data(&self) -> Result<Data, ReadError> {
320        self.inner.get::<BinaryArray, Data>(LogField::Data.as_ref())
321    }
322}
323
324/// Reader for block data from Arrow batches.
325///
326/// Provides efficient access to block fields without copying data from the underlying
327/// Arrow columnar format. Each reader is bound to a specific row in the batch.
328pub struct BlockReader<'a> {
329    inner: ArrowRowReader<'a>,
330}
331
332impl<'a> From<ArrowRowReader<'a>> for BlockReader<'a> {
333    fn from(inner: ArrowRowReader<'a>) -> Self {
334        Self { inner }
335    }
336}
337
338/// Iterator over block rows in an RecordBatch.
339pub type BlockIterator<'a> = ArrowRowIterator<'a, BlockReader<'a>>;
340
341impl<'a> BlockReader<'a> {
342    /// Safely create a new reader for the given batch at row index and check
343    /// that row_idx is within the bounds of the batch.
344    pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
345        let inner = ArrowRowReader::new(batch, row_idx)?;
346        Ok(Self { inner })
347    }
348
349    /// Create an iterator over all rows in the batch.
350    pub fn iter(batch: &'a RecordBatch) -> BlockIterator<'a> {
351        BlockIterator::new(batch)
352    }
353
354    /// The block number.
355    pub fn number(&self) -> Result<u64, ReadError> {
356        self.inner
357            .get::<UInt64Array, _>(BlockField::Number.as_ref())
358    }
359
360    /// The block hash.
361    pub fn hash(&self) -> Result<Hash, ReadError> {
362        self.inner
363            .get::<BinaryArray, Hash>(BlockField::Hash.as_ref())
364    }
365
366    /// The parent block hash.
367    pub fn parent_hash(&self) -> Result<Hash, ReadError> {
368        self.inner
369            .get::<BinaryArray, Hash>(BlockField::ParentHash.as_ref())
370    }
371
372    /// The block nonce.
373    pub fn nonce(&self) -> Result<Option<FixedSizeData<8>>, ReadError> {
374        self.inner
375            .get_nullable::<BinaryArray, FixedSizeData<8>>(BlockField::Nonce.as_ref())
376    }
377
378    /// The SHA3 hash of the uncles.
379    pub fn sha3_uncles(&self) -> Result<Hash, ReadError> {
380        self.inner
381            .get::<BinaryArray, Hash>(BlockField::Sha3Uncles.as_ref())
382    }
383
384    /// The Bloom filter for the logs of the block.
385    pub fn logs_bloom(&self) -> Result<Data, ReadError> {
386        self.inner
387            .get::<BinaryArray, Data>(BlockField::LogsBloom.as_ref())
388    }
389
390    /// The root of the transaction trie of the block.
391    pub fn transactions_root(&self) -> Result<Hash, ReadError> {
392        self.inner
393            .get::<BinaryArray, Hash>(BlockField::TransactionsRoot.as_ref())
394    }
395
396    /// The root of the final state trie of the block.
397    pub fn state_root(&self) -> Result<Hash, ReadError> {
398        self.inner
399            .get::<BinaryArray, Hash>(BlockField::StateRoot.as_ref())
400    }
401
402    /// The root of the receipts trie of the block.
403    pub fn receipts_root(&self) -> Result<Hash, ReadError> {
404        self.inner
405            .get::<BinaryArray, Hash>(BlockField::ReceiptsRoot.as_ref())
406    }
407
408    /// The address of the beneficiary to whom the mining rewards were given.
409    pub fn miner(&self) -> Result<Address, ReadError> {
410        self.inner
411            .get::<BinaryArray, Address>(BlockField::Miner.as_ref())
412    }
413
414    /// The difficulty of the block.
415    pub fn difficulty(&self) -> Result<Option<Quantity>, ReadError> {
416        self.inner
417            .get_nullable::<BinaryArray, Quantity>(BlockField::Difficulty.as_ref())
418    }
419
420    /// The total difficulty of the chain until this block.
421    pub fn total_difficulty(&self) -> Result<Option<Quantity>, ReadError> {
422        self.inner
423            .get_nullable::<BinaryArray, Quantity>(BlockField::TotalDifficulty.as_ref())
424    }
425
426    /// The "extra data" field of this block.
427    pub fn extra_data(&self) -> Result<Data, ReadError> {
428        self.inner
429            .get::<BinaryArray, Data>(BlockField::ExtraData.as_ref())
430    }
431
432    /// The size of this block in bytes.
433    pub fn size(&self) -> Result<Quantity, ReadError> {
434        self.inner
435            .get::<BinaryArray, Quantity>(BlockField::Size.as_ref())
436    }
437
438    /// The maximum gas allowed in this block.
439    pub fn gas_limit(&self) -> Result<Quantity, ReadError> {
440        self.inner
441            .get::<BinaryArray, Quantity>(BlockField::GasLimit.as_ref())
442    }
443
444    /// The total used gas by all transactions in this block.
445    pub fn gas_used(&self) -> Result<Quantity, ReadError> {
446        self.inner
447            .get::<BinaryArray, Quantity>(BlockField::GasUsed.as_ref())
448    }
449
450    /// The unix timestamp for when the block was collated.
451    pub fn timestamp(&self) -> Result<Quantity, ReadError> {
452        self.inner
453            .get::<BinaryArray, Quantity>(BlockField::Timestamp.as_ref())
454    }
455
456    /// Array of uncle hashes.
457    pub fn uncles(&self) -> Result<Option<Vec<FixedSizeData<32>>>, ReadError> {
458        let all = self
459            .inner
460            .get_nullable::<BinaryArray, Data>(BlockField::Uncles.as_ref())?;
461        let Some(data) = all else {
462            return Ok(None);
463        };
464        let mut uncles = Vec::new();
465        for uncle_bytes in data.chunks(32) {
466            let uncle = FixedSizeData::<32>::try_from(uncle_bytes)
467                .context("convert uncle bytes to uncle")
468                .map_err(ReadError::ConversionError)?;
469            uncles.push(uncle);
470        }
471        Ok(Some(uncles))
472    }
473
474    /// The base fee per gas.
475    pub fn base_fee_per_gas(&self) -> Result<Option<Quantity>, ReadError> {
476        self.inner
477            .get_nullable::<BinaryArray, Quantity>(BlockField::BaseFeePerGas.as_ref())
478    }
479
480    /// The total amount of blob gas consumed by the transactions in the block.
481    pub fn blob_gas_used(&self) -> Result<Option<Quantity>, ReadError> {
482        self.inner
483            .get_nullable::<BinaryArray, Quantity>(BlockField::BlobGasUsed.as_ref())
484    }
485
486    /// A running total of blob gas consumed in excess of the target.
487    pub fn excess_blob_gas(&self) -> Result<Option<Quantity>, ReadError> {
488        self.inner
489            .get_nullable::<BinaryArray, Quantity>(BlockField::ExcessBlobGas.as_ref())
490    }
491
492    /// The hash of the parent beacon block.
493    pub fn parent_beacon_block_root(&self) -> Result<Option<Hash>, ReadError> {
494        self.inner
495            .get_nullable::<BinaryArray, Hash>(BlockField::ParentBeaconBlockRoot.as_ref())
496    }
497
498    /// The root of the withdrawal trie.
499    pub fn withdrawals_root(&self) -> Result<Option<Hash>, ReadError> {
500        self.inner
501            .get_nullable::<BinaryArray, Hash>(BlockField::WithdrawalsRoot.as_ref())
502    }
503
504    /// The withdrawals in the block.
505    pub fn withdrawals(&self) -> Result<Option<Vec<Withdrawal>>, ReadError> {
506        let withdrawals_bin = self
507            .inner
508            .get_nullable::<BinaryArray, Data>(BlockField::Withdrawals.as_ref())?;
509        let Some(withdrawals_bin) = withdrawals_bin else {
510            return Ok(None);
511        };
512
513        let deser = bincode::deserialize(&withdrawals_bin)
514            .context("deserialize withdrawals")
515            .map_err(ReadError::ConversionError)?;
516
517        Ok(Some(deser))
518    }
519
520    /// The L1 block number.
521    pub fn l1_block_number(&self) -> Result<Option<BlockNumber>, ReadError> {
522        self.inner
523            .get_nullable::<UInt64Array, _>(BlockField::L1BlockNumber.as_ref())
524    }
525
526    /// The send count.
527    pub fn send_count(&self) -> Result<Option<Quantity>, ReadError> {
528        self.inner
529            .get_nullable::<BinaryArray, Quantity>(BlockField::SendCount.as_ref())
530    }
531
532    /// The send root.
533    pub fn send_root(&self) -> Result<Option<Hash>, ReadError> {
534        self.inner
535            .get_nullable::<BinaryArray, Hash>(BlockField::SendRoot.as_ref())
536    }
537
538    /// The mix hash.
539    pub fn mix_hash(&self) -> Result<Option<Hash>, ReadError> {
540        self.inner
541            .get_nullable::<BinaryArray, Hash>(BlockField::MixHash.as_ref())
542    }
543}
544
545/// Reader for transaction data from Arrow batches.
546///
547/// Provides efficient access to transaction fields without copying data from the underlying
548/// Arrow columnar format. Each reader is bound to a specific row in the batch.
549pub struct TransactionReader<'a> {
550    inner: ArrowRowReader<'a>,
551}
552
553impl<'a> From<ArrowRowReader<'a>> for TransactionReader<'a> {
554    fn from(inner: ArrowRowReader<'a>) -> Self {
555        Self { inner }
556    }
557}
558
559/// Iterator over transaction rows in an RecordBatch.
560pub type TransactionIterator<'a> = ArrowRowIterator<'a, TransactionReader<'a>>;
561
562impl<'a> TransactionReader<'a> {
563    /// Safely create a new reader for the given batch at row index and check
564    /// that row_idx is within the bounds of the batch.
565    pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
566        let inner = ArrowRowReader::new(batch, row_idx)?;
567        Ok(Self { inner })
568    }
569
570    /// Create an iterator over all rows in the batch.
571    pub fn iter(batch: &'a RecordBatch) -> TransactionIterator<'a> {
572        TransactionIterator::new(batch)
573    }
574
575    /// The hash of the block in which this transaction was included.
576    pub fn block_hash(&self) -> Result<Hash, ReadError> {
577        self.inner
578            .get::<BinaryArray, Hash>(TransactionField::BlockHash.as_ref())
579    }
580
581    /// The number of the block in which this transaction was included.
582    pub fn block_number(&self) -> Result<BlockNumber, ReadError> {
583        self.inner
584            .get::<UInt64Array, BlockNumber>(TransactionField::BlockNumber.as_ref())
585    }
586
587    /// The address of the sender.
588    pub fn from(&self) -> Result<Option<Address>, ReadError> {
589        self.inner
590            .get_nullable::<BinaryArray, Address>(TransactionField::From.as_ref())
591    }
592
593    /// The gas limit provided by the sender.
594    pub fn gas(&self) -> Result<Quantity, ReadError> {
595        self.inner
596            .get::<BinaryArray, Quantity>(TransactionField::Gas.as_ref())
597    }
598
599    /// The gas price willing to be paid by the sender.
600    pub fn gas_price(&self) -> Result<Option<Quantity>, ReadError> {
601        self.inner
602            .get_nullable::<BinaryArray, Quantity>(TransactionField::GasPrice.as_ref())
603    }
604
605    /// The hash of this transaction.
606    pub fn hash(&self) -> Result<Hash, ReadError> {
607        self.inner
608            .get::<BinaryArray, Hash>(TransactionField::Hash.as_ref())
609    }
610
611    /// The data sent along with the transaction.
612    pub fn input(&self) -> Result<Data, ReadError> {
613        self.inner
614            .get::<BinaryArray, Data>(TransactionField::Input.as_ref())
615    }
616
617    /// The number of transactions made by the sender prior to this one.
618    pub fn nonce(&self) -> Result<Quantity, ReadError> {
619        self.inner
620            .get::<BinaryArray, Quantity>(TransactionField::Nonce.as_ref())
621    }
622
623    /// The address of the receiver.
624    pub fn to(&self) -> Result<Option<Address>, ReadError> {
625        self.inner
626            .get_nullable::<BinaryArray, Address>(TransactionField::To.as_ref())
627    }
628
629    /// The index of the transaction in the block.
630    pub fn transaction_index(&self) -> Result<TransactionIndex, ReadError> {
631        self.inner
632            .get::<UInt64Array, TransactionIndex>(TransactionField::TransactionIndex.as_ref())
633    }
634
635    /// The value transferred.
636    pub fn value(&self) -> Result<Quantity, ReadError> {
637        self.inner
638            .get::<BinaryArray, Quantity>(TransactionField::Value.as_ref())
639    }
640
641    /// ECDSA recovery id.
642    pub fn v(&self) -> Result<Option<Quantity>, ReadError> {
643        self.inner
644            .get_nullable::<BinaryArray, Quantity>(TransactionField::V.as_ref())
645    }
646
647    /// ECDSA signature r.
648    pub fn r(&self) -> Result<Option<Quantity>, ReadError> {
649        self.inner
650            .get_nullable::<BinaryArray, Quantity>(TransactionField::R.as_ref())
651    }
652
653    /// ECDSA signature s.
654    pub fn s(&self) -> Result<Option<Quantity>, ReadError> {
655        self.inner
656            .get_nullable::<BinaryArray, Quantity>(TransactionField::S.as_ref())
657    }
658
659    /// Maximum fee per gas the sender is willing to pay for priority.
660    pub fn max_priority_fee_per_gas(&self) -> Result<Option<Quantity>, ReadError> {
661        self.inner
662            .get_nullable::<BinaryArray, Quantity>(TransactionField::MaxPriorityFeePerGas.as_ref())
663    }
664
665    /// Maximum total fee per gas the sender is willing to pay.
666    pub fn max_fee_per_gas(&self) -> Result<Option<Quantity>, ReadError> {
667        self.inner
668            .get_nullable::<BinaryArray, Quantity>(TransactionField::MaxFeePerGas.as_ref())
669    }
670
671    /// The chain id of the transaction.
672    pub fn chain_id(&self) -> Result<Option<Quantity>, ReadError> {
673        self.inner
674            .get_nullable::<BinaryArray, Quantity>(TransactionField::ChainId.as_ref())
675    }
676
677    /// The total amount of gas used when this transaction was executed in the block.
678    pub fn cumulative_gas_used(&self) -> Result<Quantity, ReadError> {
679        self.inner
680            .get::<BinaryArray, Quantity>(TransactionField::CumulativeGasUsed.as_ref())
681    }
682
683    /// The sum of the base fee and tip paid per unit of gas.
684    pub fn effective_gas_price(&self) -> Result<Quantity, ReadError> {
685        self.inner
686            .get::<BinaryArray, Quantity>(TransactionField::EffectiveGasPrice.as_ref())
687    }
688
689    /// The amount of gas used by this transaction.
690    pub fn gas_used(&self) -> Result<Quantity, ReadError> {
691        self.inner
692            .get::<BinaryArray, Quantity>(TransactionField::GasUsed.as_ref())
693    }
694
695    /// The contract address created, if the transaction was a contract creation.
696    pub fn contract_address(&self) -> Result<Option<Address>, ReadError> {
697        self.inner
698            .get_nullable::<BinaryArray, Address>(TransactionField::ContractAddress.as_ref())
699    }
700
701    /// The Bloom filter for the logs of the transaction.
702    pub fn logs_bloom(&self) -> Result<Data, ReadError> {
703        self.inner
704            .get::<BinaryArray, Data>(TransactionField::LogsBloom.as_ref())
705    }
706
707    /// The type of the transaction.
708    pub fn type_(&self) -> Result<Option<TransactionType>, ReadError> {
709        let type_ = self
710            .inner
711            .get_nullable::<UInt8Array, u8>(TransactionField::Type.as_ref())?;
712        Ok(type_.map(TransactionType::from))
713    }
714
715    /// The post-transaction stateroot.
716    pub fn root(&self) -> Result<Option<Hash>, ReadError> {
717        self.inner
718            .get_nullable::<BinaryArray, Hash>(TransactionField::Root.as_ref())
719    }
720
721    /// Either 1 (success) or 0 (failure).
722    pub fn status(&self) -> Result<Option<TransactionStatus>, ReadError> {
723        let status = self
724            .inner
725            .get_nullable::<UInt8Array, u8>(TransactionField::Status.as_ref())?;
726        let Some(status) = status else {
727            return Ok(None);
728        };
729        let status = TransactionStatus::from_u8(status)
730            .context("convert u8 to transaction status")
731            .map_err(ReadError::ConversionError)?;
732        Ok(Some(status))
733    }
734
735    /// The first 4 bytes of the transaction input data.
736    pub fn sighash(&self) -> Result<Option<Data>, ReadError> {
737        self.inner
738            .get_nullable::<BinaryArray, _>(TransactionField::Sighash.as_ref())
739    }
740
741    /// The y parity of the signature.
742    pub fn y_parity(&self) -> Result<Option<Quantity>, ReadError> {
743        self.inner
744            .get_nullable::<BinaryArray, Quantity>(TransactionField::YParity.as_ref())
745    }
746
747    /// The access list.
748    pub fn access_list(&self) -> Result<Option<Vec<AccessList>>, ReadError> {
749        let bin = self
750            .inner
751            .get_nullable::<BinaryArray, Data>(TransactionField::AccessList.as_ref())?;
752        let Some(bin) = bin else {
753            return Ok(None);
754        };
755        let deser = bincode::deserialize(&bin)
756            .context("deserialize access list")
757            .map_err(ReadError::ConversionError)?;
758        Ok(Some(deser))
759    }
760
761    /// The authorization list.
762    pub fn authorization_list(&self) -> Result<Option<Vec<Authorization>>, ReadError> {
763        let bin = self
764            .inner
765            .get_nullable::<BinaryArray, Data>(TransactionField::AuthorizationList.as_ref())?;
766        let Some(bin) = bin else {
767            return Ok(None);
768        };
769        let deser = bincode::deserialize(&bin)
770            .context("deserialize authorization list")
771            .map_err(ReadError::ConversionError)?;
772        Ok(Some(deser))
773    }
774
775    // Additional L1/L2 and blob-related fields would go here...
776    // For brevity, I'll include a few key ones:
777
778    /// The L1 fee for L2 transactions.
779    pub fn l1_fee(&self) -> Result<Option<Quantity>, ReadError> {
780        self.inner
781            .get_nullable::<BinaryArray, Quantity>(TransactionField::L1Fee.as_ref())
782    }
783
784    /// The maximum fee per blob gas.
785    pub fn max_fee_per_blob_gas(&self) -> Result<Option<Quantity>, ReadError> {
786        self.inner
787            .get_nullable::<BinaryArray, Quantity>(TransactionField::MaxFeePerBlobGas.as_ref())
788    }
789
790    /// The blob versioned hashes.
791    pub fn blob_versioned_hashes(&self) -> Result<Option<Vec<Hash>>, ReadError> {
792        let bin = self
793            .inner
794            .get_nullable::<BinaryArray, Data>(TransactionField::BlobVersionedHashes.as_ref())?;
795        let Some(bin) = bin else {
796            return Ok(None);
797        };
798        let mut hashes = Vec::new();
799        for hash_bytes in bin.chunks(32) {
800            let hash = Hash::try_from(hash_bytes)
801                .context("convert blob versioned hash bytes to hash")
802                .map_err(ReadError::ConversionError)?;
803            hashes.push(hash);
804        }
805        Ok(Some(hashes))
806    }
807
808    /// The L1 gas price for L2 transactions.
809    pub fn l1_gas_price(&self) -> Result<Option<Quantity>, ReadError> {
810        self.inner
811            .get_nullable::<BinaryArray, Quantity>(TransactionField::L1GasPrice.as_ref())
812    }
813
814    /// The L1 gas used for L2 transactions.
815    pub fn l1_gas_used(&self) -> Result<Option<Quantity>, ReadError> {
816        self.inner
817            .get_nullable::<BinaryArray, Quantity>(TransactionField::L1GasUsed.as_ref())
818    }
819
820    /// The L1 fee scalar for L2 transactions.
821    pub fn l1_fee_scalar(&self) -> Result<Option<f64>, ReadError> {
822        let scalar = self
823            .inner
824            .get_nullable::<BinaryArray, Quantity>(TransactionField::L1FeeScalar.as_ref())?;
825        let Some(scalar_utf8) = scalar else {
826            return Ok(None);
827        };
828        // stored as a string of float eg 0.69 (utf8 encoded)
829        let scalar_str = std::str::from_utf8(&scalar_utf8)
830            .context("convert l1 fee scalar to string")
831            .map_err(ReadError::ConversionError)?;
832
833        let scalar_f64: f64 = scalar_str
834            .parse()
835            .context("parse l1 fee scalar as f64")
836            .map_err(ReadError::ConversionError)?;
837        Ok(Some(scalar_f64))
838    }
839
840    /// The gas used for L1 for L2 transactions.
841    pub fn gas_used_for_l1(&self) -> Result<Option<Quantity>, ReadError> {
842        self.inner
843            .get_nullable::<BinaryArray, Quantity>(TransactionField::GasUsedForL1.as_ref())
844    }
845
846    /// The blob gas price.
847    pub fn blob_gas_price(&self) -> Result<Option<Quantity>, ReadError> {
848        self.inner
849            .get_nullable::<BinaryArray, Quantity>(TransactionField::BlobGasPrice.as_ref())
850    }
851
852    /// The blob gas used.
853    pub fn blob_gas_used(&self) -> Result<Option<Quantity>, ReadError> {
854        self.inner
855            .get_nullable::<BinaryArray, Quantity>(TransactionField::BlobGasUsed.as_ref())
856    }
857
858    /// The deposit nonce for deposit transactions.
859    pub fn deposit_nonce(&self) -> Result<Option<Quantity>, ReadError> {
860        self.inner
861            .get_nullable::<BinaryArray, Quantity>(TransactionField::DepositNonce.as_ref())
862    }
863
864    /// The deposit receipt version for deposit transactions.
865    pub fn deposit_receipt_version(&self) -> Result<Option<Quantity>, ReadError> {
866        self.inner
867            .get_nullable::<BinaryArray, Quantity>(TransactionField::DepositReceiptVersion.as_ref())
868    }
869
870    /// The L1 base fee scalar for L2 transactions.
871    pub fn l1_base_fee_scalar(&self) -> Result<Option<Quantity>, ReadError> {
872        self.inner
873            .get_nullable::<BinaryArray, Quantity>(TransactionField::L1BaseFeeScalar.as_ref())
874    }
875
876    /// The L1 blob base fee for L2 transactions.
877    pub fn l1_blob_base_fee(&self) -> Result<Option<Quantity>, ReadError> {
878        self.inner
879            .get_nullable::<BinaryArray, Quantity>(TransactionField::L1BlobBaseFee.as_ref())
880    }
881
882    /// The L1 blob base fee scalar for L2 transactions.
883    pub fn l1_blob_base_fee_scalar(&self) -> Result<Option<Quantity>, ReadError> {
884        self.inner
885            .get_nullable::<BinaryArray, Quantity>(TransactionField::L1BlobBaseFeeScalar.as_ref())
886    }
887
888    /// The L1 block number for L2 transactions.
889    pub fn l1_block_number(&self) -> Result<Option<Quantity>, ReadError> {
890        self.inner
891            .get_nullable::<BinaryArray, _>(TransactionField::L1BlockNumber.as_ref())
892    }
893
894    /// The mint value for deposit transactions.
895    pub fn mint(&self) -> Result<Option<Quantity>, ReadError> {
896        self.inner
897            .get_nullable::<BinaryArray, Quantity>(TransactionField::Mint.as_ref())
898    }
899
900    /// The source hash for deposit transactions.
901    pub fn source_hash(&self) -> Result<Option<Hash>, ReadError> {
902        self.inner
903            .get_nullable::<BinaryArray, Hash>(TransactionField::SourceHash.as_ref())
904    }
905}
906
907/// Reader for trace data from Arrow batches.
908///
909/// Provides efficient access to trace fields without copying data from the underlying
910/// Arrow columnar format. Each reader is bound to a specific row in the batch.
911pub struct TraceReader<'a> {
912    inner: ArrowRowReader<'a>,
913}
914
915impl<'a> From<ArrowRowReader<'a>> for TraceReader<'a> {
916    fn from(inner: ArrowRowReader<'a>) -> Self {
917        Self { inner }
918    }
919}
920
921/// Iterator over trace rows in an RecordBatch.
922pub type TraceIterator<'a> = ArrowRowIterator<'a, TraceReader<'a>>;
923
924impl<'a> TraceReader<'a> {
925    /// Safely create a new reader for the given batch at row index and check
926    /// that row_idx is within the bounds of the batch.
927    pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result<Self> {
928        let inner = ArrowRowReader::new(batch, row_idx)?;
929        Ok(Self { inner })
930    }
931
932    /// Create an iterator over all rows in the batch.
933    pub fn iter(batch: &'a RecordBatch) -> TraceIterator<'a> {
934        TraceIterator::new(batch)
935    }
936
937    /// The hash of the block in which this trace occurred.
938    pub fn block_hash(&self) -> Result<Hash, ReadError> {
939        self.inner
940            .get::<BinaryArray, Hash>(TraceField::BlockHash.as_ref())
941    }
942
943    /// The number of the block in which this trace occurred.
944    pub fn block_number(&self) -> Result<u64, ReadError> {
945        self.inner
946            .get::<UInt64Array, _>(TraceField::BlockNumber.as_ref())
947    }
948
949    /// The address from which the trace originated.
950    pub fn from(&self) -> Result<Option<Address>, ReadError> {
951        self.inner
952            .get_nullable::<BinaryArray, Address>(TraceField::From.as_ref())
953    }
954
955    /// The address to which the trace was sent.
956    pub fn to(&self) -> Result<Option<Address>, ReadError> {
957        self.inner
958            .get_nullable::<BinaryArray, Address>(TraceField::To.as_ref())
959    }
960
961    /// The type of call.
962    pub fn call_type(&self) -> Result<Option<String>, ReadError> {
963        self.inner
964            .get_nullable::<StringArray, String>(TraceField::CallType.as_ref())
965    }
966
967    /// The amount of gas provided to the trace.
968    pub fn gas(&self) -> Result<Option<Quantity>, ReadError> {
969        self.inner
970            .get_nullable::<BinaryArray, Quantity>(TraceField::Gas.as_ref())
971    }
972
973    /// The input data.
974    pub fn input(&self) -> Result<Option<Data>, ReadError> {
975        self.inner
976            .get_nullable::<BinaryArray, Data>(TraceField::Input.as_ref())
977    }
978
979    /// The init data for contract creation traces.
980    pub fn init(&self) -> Result<Option<Data>, ReadError> {
981        self.inner
982            .get_nullable::<BinaryArray, Data>(TraceField::Init.as_ref())
983    }
984
985    /// The value transferred.
986    pub fn value(&self) -> Result<Option<Quantity>, ReadError> {
987        self.inner
988            .get_nullable::<BinaryArray, Quantity>(TraceField::Value.as_ref())
989    }
990
991    /// The address of the author (miner).
992    pub fn author(&self) -> Result<Option<Address>, ReadError> {
993        self.inner
994            .get_nullable::<BinaryArray, Address>(TraceField::Author.as_ref())
995    }
996
997    /// The type of reward.
998    pub fn reward_type(&self) -> Result<Option<String>, ReadError> {
999        self.inner
1000            .get_nullable::<StringArray, String>(TraceField::RewardType.as_ref())
1001    }
1002
1003    /// The address involved in the trace.
1004    pub fn address(&self) -> Result<Option<Address>, ReadError> {
1005        self.inner
1006            .get_nullable::<BinaryArray, Address>(TraceField::Address.as_ref())
1007    }
1008
1009    /// The bytecode.
1010    pub fn code(&self) -> Result<Option<Data>, ReadError> {
1011        self.inner
1012            .get_nullable::<BinaryArray, Data>(TraceField::Code.as_ref())
1013    }
1014
1015    /// The amount of gas used by the trace.
1016    pub fn gas_used(&self) -> Result<Option<Quantity>, ReadError> {
1017        self.inner
1018            .get_nullable::<BinaryArray, Quantity>(TraceField::GasUsed.as_ref())
1019    }
1020
1021    /// The output data.
1022    pub fn output(&self) -> Result<Option<Data>, ReadError> {
1023        self.inner
1024            .get_nullable::<BinaryArray, Data>(TraceField::Output.as_ref())
1025    }
1026
1027    /// The number of sub-traces.
1028    pub fn subtraces(&self) -> Result<Option<u64>, ReadError> {
1029        self.inner
1030            .get_nullable::<UInt64Array, u64>(TraceField::Subtraces.as_ref())
1031    }
1032
1033    /// The trace address.
1034    pub fn trace_address(&self) -> Result<Option<Vec<u64>>, ReadError> {
1035        let bin = self
1036            .inner
1037            .get_nullable::<BinaryArray, Data>(TraceField::TraceAddress.as_ref())?;
1038        let Some(bin) = bin else {
1039            return Ok(None);
1040        };
1041        let deser = bincode::deserialize(&bin)
1042            .context("deserialize trace address")
1043            .map_err(ReadError::ConversionError)?;
1044        Ok(Some(deser))
1045    }
1046
1047    /// The hash of the transaction this trace belongs to.
1048    pub fn transaction_hash(&self) -> Result<Option<Hash>, ReadError> {
1049        self.inner
1050            .get_nullable::<BinaryArray, Hash>(TraceField::TransactionHash.as_ref())
1051    }
1052
1053    /// The position of the transaction in the block.
1054    pub fn transaction_position(&self) -> Result<Option<u64>, ReadError> {
1055        self.inner
1056            .get_nullable::<UInt64Array, u64>(TraceField::TransactionPosition.as_ref())
1057    }
1058
1059    /// The type of trace.
1060    pub fn type_(&self) -> Result<Option<String>, ReadError> {
1061        self.inner
1062            .get_nullable::<StringArray, String>(TraceField::Type.as_ref())
1063    }
1064
1065    /// The error message, if any.
1066    pub fn error(&self) -> Result<Option<String>, ReadError> {
1067        self.inner
1068            .get_nullable::<StringArray, String>(TraceField::Error.as_ref())
1069    }
1070
1071    /// The first 4 bytes of the input data.
1072    pub fn sighash(&self) -> Result<Option<Data>, ReadError> {
1073        self.inner
1074            .get_nullable::<BinaryArray, _>(TraceField::Sighash.as_ref())
1075    }
1076
1077    /// The action address.
1078    pub fn action_address(&self) -> Result<Option<Address>, ReadError> {
1079        self.inner
1080            .get_nullable::<BinaryArray, Address>(TraceField::ActionAddress.as_ref())
1081    }
1082
1083    /// The balance.
1084    pub fn balance(&self) -> Result<Option<Quantity>, ReadError> {
1085        self.inner
1086            .get_nullable::<BinaryArray, Quantity>(TraceField::Balance.as_ref())
1087    }
1088
1089    /// The refund address.
1090    pub fn refund_address(&self) -> Result<Option<Address>, ReadError> {
1091        self.inner
1092            .get_nullable::<BinaryArray, Address>(TraceField::RefundAddress.as_ref())
1093    }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use super::*;
1099    use anyhow::Context;
1100
1101    trait NotOption {}
1102
1103    impl NotOption for hypersync_format::Quantity {}
1104    impl NotOption for hypersync_format::Data {}
1105    impl NotOption for hypersync_format::UInt {}
1106    impl<const N: usize> NotOption for hypersync_format::FixedSizeData<N> {}
1107    impl NotOption for u64 {}
1108
1109    /// Compile-time tests that ensure the correct return types
1110    #[test]
1111    fn test_nullability_matches_schema() {
1112        fn assert_nullable<'a, T, F>(_: F, log_field: LogField)
1113        where
1114            F: FnOnce(&LogReader<'a>) -> Result<Option<T>, ReadError>,
1115        {
1116            assert!(log_field.is_nullable(), "Optional type should be nullable");
1117        }
1118
1119        fn assert_not_nullable<'a, T, F>(_: F, log_field: LogField)
1120        where
1121            F: FnOnce(&LogReader<'a>) -> Result<T, ReadError>,
1122            // just to make sure its an inner type and not an Option
1123            T: NotOption,
1124        {
1125            assert!(!log_field.is_nullable(), "should not be nullable");
1126        }
1127        // This test will fail to compile if the return types are wrong
1128
1129        for field in LogField::all() {
1130            match field {
1131                LogField::Removed => assert_nullable(LogReader::removed, field),
1132                LogField::Topic0 => assert_nullable(LogReader::topic0, field),
1133                LogField::Topic1 => assert_nullable(LogReader::topic1, field),
1134                LogField::Topic2 => assert_nullable(LogReader::topic2, field),
1135                LogField::Topic3 => assert_nullable(LogReader::topic3, field),
1136                LogField::LogIndex => assert_not_nullable(LogReader::log_index, field),
1137                LogField::TransactionIndex => {
1138                    assert_not_nullable(LogReader::transaction_index, field)
1139                }
1140                LogField::TransactionHash => {
1141                    assert_not_nullable(LogReader::transaction_hash, field)
1142                }
1143                LogField::BlockHash => assert_not_nullable(LogReader::block_hash, field),
1144                LogField::BlockNumber => assert_not_nullable(LogReader::block_number, field),
1145                LogField::Address => assert_not_nullable(LogReader::address, field),
1146                LogField::Data => assert_not_nullable(LogReader::data, field),
1147            }
1148        }
1149    }
1150
1151    #[test]
1152    fn test_block_nullability_matches_schema() {
1153        fn assert_nullable<'a, T, F>(_: F, block_field: BlockField)
1154        where
1155            F: FnOnce(&BlockReader<'a>) -> Result<Option<T>, ReadError>,
1156        {
1157            assert!(
1158                block_field.is_nullable(),
1159                "Optional type should be nullable"
1160            );
1161        }
1162
1163        fn assert_not_nullable<'a, T, F>(_: F, block_field: BlockField)
1164        where
1165            F: FnOnce(&BlockReader<'a>) -> Result<T, ReadError>,
1166            T: NotOption,
1167        {
1168            assert!(!block_field.is_nullable(), "should not be nullable");
1169        }
1170
1171        for field in BlockField::all() {
1172            match field {
1173                // Nullable fields
1174                BlockField::Nonce => assert_nullable(BlockReader::nonce, field),
1175                BlockField::Difficulty => assert_nullable(BlockReader::difficulty, field),
1176                BlockField::TotalDifficulty => {
1177                    assert_nullable(BlockReader::total_difficulty, field)
1178                }
1179                BlockField::Uncles => assert_nullable(BlockReader::uncles, field),
1180                BlockField::BaseFeePerGas => assert_nullable(BlockReader::base_fee_per_gas, field),
1181                BlockField::BlobGasUsed => assert_nullable(BlockReader::blob_gas_used, field),
1182                BlockField::ExcessBlobGas => assert_nullable(BlockReader::excess_blob_gas, field),
1183                BlockField::ParentBeaconBlockRoot => {
1184                    assert_nullable(BlockReader::parent_beacon_block_root, field)
1185                }
1186                BlockField::WithdrawalsRoot => {
1187                    assert_nullable(BlockReader::withdrawals_root, field)
1188                }
1189                BlockField::Withdrawals => assert_nullable(BlockReader::withdrawals, field),
1190                BlockField::L1BlockNumber => assert_nullable(BlockReader::l1_block_number, field),
1191                BlockField::SendCount => assert_nullable(BlockReader::send_count, field),
1192                BlockField::SendRoot => assert_nullable(BlockReader::send_root, field),
1193                BlockField::MixHash => assert_nullable(BlockReader::mix_hash, field),
1194                // Non-nullable fields
1195                BlockField::Number => assert_not_nullable(BlockReader::number, field),
1196                BlockField::Hash => assert_not_nullable(BlockReader::hash, field),
1197                BlockField::ParentHash => assert_not_nullable(BlockReader::parent_hash, field),
1198                BlockField::Sha3Uncles => assert_not_nullable(BlockReader::sha3_uncles, field),
1199                BlockField::LogsBloom => assert_not_nullable(BlockReader::logs_bloom, field),
1200                BlockField::TransactionsRoot => {
1201                    assert_not_nullable(BlockReader::transactions_root, field)
1202                }
1203                BlockField::StateRoot => assert_not_nullable(BlockReader::state_root, field),
1204                BlockField::ReceiptsRoot => assert_not_nullable(BlockReader::receipts_root, field),
1205                BlockField::Miner => assert_not_nullable(BlockReader::miner, field),
1206                BlockField::ExtraData => assert_not_nullable(BlockReader::extra_data, field),
1207                BlockField::Size => assert_not_nullable(BlockReader::size, field),
1208                BlockField::GasLimit => assert_not_nullable(BlockReader::gas_limit, field),
1209                BlockField::GasUsed => assert_not_nullable(BlockReader::gas_used, field),
1210                BlockField::Timestamp => assert_not_nullable(BlockReader::timestamp, field),
1211            }
1212        }
1213    }
1214
1215    #[test]
1216    fn test_transaction_nullability_matches_schema() {
1217        fn assert_nullable<'a, T, F>(_: F, transaction_field: TransactionField)
1218        where
1219            F: FnOnce(&TransactionReader<'a>) -> Result<Option<T>, ReadError>,
1220        {
1221            assert!(
1222                transaction_field.is_nullable(),
1223                "Optional type should be nullable"
1224            );
1225        }
1226
1227        fn assert_not_nullable<'a, T, F>(_: F, transaction_field: TransactionField)
1228        where
1229            F: FnOnce(&TransactionReader<'a>) -> Result<T, ReadError>,
1230            T: NotOption,
1231        {
1232            assert!(!transaction_field.is_nullable(), "should not be nullable");
1233        }
1234
1235        for field in TransactionField::all() {
1236            match field {
1237                TransactionField::From => assert_nullable(TransactionReader::from, field),
1238                TransactionField::GasPrice => assert_nullable(TransactionReader::gas_price, field),
1239                TransactionField::To => assert_nullable(TransactionReader::to, field),
1240                TransactionField::V => assert_nullable(TransactionReader::v, field),
1241                TransactionField::R => assert_nullable(TransactionReader::r, field),
1242                TransactionField::S => assert_nullable(TransactionReader::s, field),
1243                TransactionField::MaxPriorityFeePerGas => {
1244                    assert_nullable(TransactionReader::max_priority_fee_per_gas, field)
1245                }
1246                TransactionField::MaxFeePerGas => {
1247                    assert_nullable(TransactionReader::max_fee_per_gas, field)
1248                }
1249                TransactionField::ChainId => assert_nullable(TransactionReader::chain_id, field),
1250                TransactionField::ContractAddress => {
1251                    assert_nullable(TransactionReader::contract_address, field)
1252                }
1253                TransactionField::Type => assert_nullable(TransactionReader::type_, field),
1254                TransactionField::Root => assert_nullable(TransactionReader::root, field),
1255                TransactionField::Status => assert_nullable(TransactionReader::status, field),
1256                TransactionField::Sighash => assert_nullable(TransactionReader::sighash, field),
1257                TransactionField::YParity => assert_nullable(TransactionReader::y_parity, field),
1258                TransactionField::AccessList => {
1259                    assert_nullable(TransactionReader::access_list, field)
1260                }
1261                TransactionField::AuthorizationList => {
1262                    assert_nullable(TransactionReader::authorization_list, field)
1263                }
1264                TransactionField::L1Fee => assert_nullable(TransactionReader::l1_fee, field),
1265                TransactionField::MaxFeePerBlobGas => {
1266                    assert_nullable(TransactionReader::max_fee_per_blob_gas, field)
1267                }
1268                TransactionField::BlobVersionedHashes => {
1269                    assert_nullable(TransactionReader::blob_versioned_hashes, field)
1270                }
1271                TransactionField::BlockHash => {
1272                    assert_not_nullable(TransactionReader::block_hash, field)
1273                }
1274                TransactionField::BlockNumber => {
1275                    assert_not_nullable(TransactionReader::block_number, field)
1276                }
1277                TransactionField::Gas => assert_not_nullable(TransactionReader::gas, field),
1278                TransactionField::Hash => assert_not_nullable(TransactionReader::hash, field),
1279                TransactionField::Input => assert_not_nullable(TransactionReader::input, field),
1280                TransactionField::Nonce => assert_not_nullable(TransactionReader::nonce, field),
1281                TransactionField::TransactionIndex => {
1282                    assert_not_nullable(TransactionReader::transaction_index, field)
1283                }
1284                TransactionField::Value => assert_not_nullable(TransactionReader::value, field),
1285                TransactionField::CumulativeGasUsed => {
1286                    assert_not_nullable(TransactionReader::cumulative_gas_used, field)
1287                }
1288                TransactionField::EffectiveGasPrice => {
1289                    assert_not_nullable(TransactionReader::effective_gas_price, field)
1290                }
1291                TransactionField::GasUsed => {
1292                    assert_not_nullable(TransactionReader::gas_used, field)
1293                }
1294                TransactionField::LogsBloom => {
1295                    assert_not_nullable(TransactionReader::logs_bloom, field)
1296                }
1297                TransactionField::L1GasPrice => {
1298                    assert_nullable(TransactionReader::l1_gas_price, field)
1299                }
1300                TransactionField::L1GasUsed => {
1301                    assert_nullable(TransactionReader::l1_gas_used, field)
1302                }
1303                TransactionField::L1FeeScalar => {
1304                    assert_nullable(TransactionReader::l1_fee_scalar, field)
1305                }
1306                TransactionField::GasUsedForL1 => {
1307                    assert_nullable(TransactionReader::gas_used_for_l1, field)
1308                }
1309                TransactionField::BlobGasPrice => {
1310                    assert_nullable(TransactionReader::blob_gas_price, field)
1311                }
1312                TransactionField::BlobGasUsed => {
1313                    assert_nullable(TransactionReader::blob_gas_used, field)
1314                }
1315                TransactionField::DepositNonce => {
1316                    assert_nullable(TransactionReader::deposit_nonce, field)
1317                }
1318                TransactionField::DepositReceiptVersion => {
1319                    assert_nullable(TransactionReader::deposit_receipt_version, field)
1320                }
1321                TransactionField::L1BaseFeeScalar => {
1322                    assert_nullable(TransactionReader::l1_base_fee_scalar, field)
1323                }
1324                TransactionField::L1BlobBaseFee => {
1325                    assert_nullable(TransactionReader::l1_blob_base_fee, field)
1326                }
1327                TransactionField::L1BlobBaseFeeScalar => {
1328                    assert_nullable(TransactionReader::l1_blob_base_fee_scalar, field)
1329                }
1330
1331                TransactionField::L1BlockNumber => {
1332                    assert_nullable(TransactionReader::l1_block_number, field)
1333                }
1334                TransactionField::Mint => assert_nullable(TransactionReader::mint, field),
1335                TransactionField::SourceHash => {
1336                    assert_nullable(TransactionReader::source_hash, field)
1337                }
1338            }
1339        }
1340    }
1341
1342    #[test]
1343    fn test_trace_nullability_matches_schema() {
1344        fn assert_nullable<'a, T, F>(_: F, trace_field: TraceField)
1345        where
1346            F: FnOnce(&TraceReader<'a>) -> Result<Option<T>, ReadError>,
1347        {
1348            assert!(
1349                trace_field.is_nullable(),
1350                "Optional type should be nullable"
1351            );
1352        }
1353
1354        fn assert_not_nullable<'a, T, F>(_: F, trace_field: TraceField)
1355        where
1356            F: FnOnce(&TraceReader<'a>) -> Result<T, ReadError>,
1357            T: NotOption,
1358        {
1359            assert!(!trace_field.is_nullable(), "should not be nullable");
1360        }
1361
1362        for field in TraceField::all() {
1363            match field {
1364                // Nullable fields
1365                TraceField::TransactionHash => {
1366                    assert_nullable(TraceReader::transaction_hash, field)
1367                }
1368                TraceField::TransactionPosition => {
1369                    assert_nullable(TraceReader::transaction_position, field)
1370                }
1371                TraceField::Type => assert_nullable(TraceReader::type_, field),
1372                TraceField::Error => assert_nullable(TraceReader::error, field),
1373                TraceField::From => assert_nullable(TraceReader::from, field),
1374                TraceField::To => assert_nullable(TraceReader::to, field),
1375                TraceField::Author => assert_nullable(TraceReader::author, field),
1376                TraceField::Gas => assert_nullable(TraceReader::gas, field),
1377                TraceField::GasUsed => assert_nullable(TraceReader::gas_used, field),
1378                TraceField::ActionAddress => assert_nullable(TraceReader::action_address, field),
1379                TraceField::Address => assert_nullable(TraceReader::address, field),
1380                TraceField::Balance => assert_nullable(TraceReader::balance, field),
1381                TraceField::CallType => assert_nullable(TraceReader::call_type, field),
1382                TraceField::Code => assert_nullable(TraceReader::code, field),
1383                TraceField::Init => assert_nullable(TraceReader::init, field),
1384                TraceField::Input => assert_nullable(TraceReader::input, field),
1385                TraceField::Output => assert_nullable(TraceReader::output, field),
1386                TraceField::RefundAddress => assert_nullable(TraceReader::refund_address, field),
1387                TraceField::RewardType => assert_nullable(TraceReader::reward_type, field),
1388                TraceField::Sighash => assert_nullable(TraceReader::sighash, field),
1389                TraceField::Subtraces => assert_nullable(TraceReader::subtraces, field),
1390                TraceField::TraceAddress => assert_nullable(TraceReader::trace_address, field),
1391                TraceField::Value => assert_nullable(TraceReader::value, field),
1392                // Non-nullable fields
1393                TraceField::BlockHash => assert_not_nullable(TraceReader::block_hash, field),
1394                TraceField::BlockNumber => assert_not_nullable(TraceReader::block_number, field),
1395            }
1396        }
1397    }
1398
1399    fn assert_ok<T, E: std::fmt::Debug>(result: Result<T, E>) {
1400        let _ = result.expect("should be ok");
1401    }
1402
1403    #[tokio::test(flavor = "multi_thread")]
1404    #[ignore = "integration test for returned schema"]
1405    async fn test_readers_integration() -> anyhow::Result<()> {
1406        use crate::{
1407            net_types::{LogField, LogFilter, TransactionField, TransactionFilter},
1408            Client, Query,
1409        };
1410        let client = Client::builder()
1411            .url("https://eth-traces.hypersync.xyz")
1412            .api_token(dotenvy::var("HYPERSYNC_API_TOKEN")?)
1413            .build()
1414            .context("Failed to build client")?;
1415
1416        let query = Query::new()
1417            .from_block(20_000_000)
1418            .to_block_excl(20_000_001)
1419            .include_all_blocks()
1420            .where_logs(LogFilter::all())
1421            .where_transactions(TransactionFilter::all())
1422            .select_log_fields(LogField::all())
1423            .select_block_fields(BlockField::all())
1424            .select_transaction_fields(TransactionField::all())
1425            .select_trace_fields(TraceField::all());
1426
1427        let res = client.collect_arrow(query, Default::default()).await?;
1428
1429        let mut num_logs = 0;
1430
1431        for batch in res.data.logs {
1432            for log_reader in LogReader::iter(&batch) {
1433                num_logs += 1;
1434
1435                for log_field in LogField::all() {
1436                    match log_field {
1437                        LogField::Removed => assert_ok(log_reader.removed()),
1438                        LogField::LogIndex => assert_ok(log_reader.log_index()),
1439                        LogField::TransactionIndex => assert_ok(log_reader.transaction_index()),
1440                        LogField::TransactionHash => assert_ok(log_reader.transaction_hash()),
1441                        LogField::BlockHash => assert_ok(log_reader.block_hash()),
1442                        LogField::BlockNumber => assert_ok(log_reader.block_number()),
1443                        LogField::Address => assert_ok(log_reader.address()),
1444                        LogField::Data => assert_ok(log_reader.data()),
1445                        LogField::Topic0 => assert_ok(log_reader.topic0()),
1446                        LogField::Topic1 => assert_ok(log_reader.topic1()),
1447                        LogField::Topic2 => assert_ok(log_reader.topic2()),
1448                        LogField::Topic3 => assert_ok(log_reader.topic3()),
1449                    }
1450                }
1451            }
1452        }
1453
1454        println!("num_logs: {}", num_logs);
1455
1456        let mut num_transactions = 0;
1457
1458        for batch in res.data.transactions {
1459            for transaction_reader in TransactionReader::iter(&batch) {
1460                num_transactions += 1;
1461
1462                for transaction_field in TransactionField::all() {
1463                    match transaction_field {
1464                        TransactionField::BlockHash => assert_ok(transaction_reader.block_hash()),
1465                        TransactionField::BlockNumber => {
1466                            assert_ok(transaction_reader.block_number())
1467                        }
1468                        TransactionField::From => assert_ok(transaction_reader.from()),
1469                        TransactionField::Gas => assert_ok(transaction_reader.gas()),
1470                        TransactionField::GasPrice => assert_ok(transaction_reader.gas_price()),
1471                        TransactionField::Hash => assert_ok(transaction_reader.hash()),
1472                        TransactionField::Input => assert_ok(transaction_reader.input()),
1473                        TransactionField::Nonce => assert_ok(transaction_reader.nonce()),
1474                        TransactionField::To => assert_ok(transaction_reader.to()),
1475                        TransactionField::TransactionIndex => {
1476                            assert_ok(transaction_reader.transaction_index())
1477                        }
1478                        TransactionField::Value => assert_ok(transaction_reader.value()),
1479                        TransactionField::V => assert_ok(transaction_reader.v()),
1480                        TransactionField::R => assert_ok(transaction_reader.r()),
1481                        TransactionField::S => assert_ok(transaction_reader.s()),
1482                        TransactionField::MaxPriorityFeePerGas => {
1483                            assert_ok(transaction_reader.max_priority_fee_per_gas())
1484                        }
1485                        TransactionField::MaxFeePerGas => {
1486                            assert_ok(transaction_reader.max_fee_per_gas())
1487                        }
1488                        TransactionField::ChainId => assert_ok(transaction_reader.chain_id()),
1489                        TransactionField::CumulativeGasUsed => {
1490                            assert_ok(transaction_reader.cumulative_gas_used())
1491                        }
1492                        TransactionField::EffectiveGasPrice => {
1493                            assert_ok(transaction_reader.effective_gas_price())
1494                        }
1495                        TransactionField::GasUsed => assert_ok(transaction_reader.gas_used()),
1496                        TransactionField::ContractAddress => {
1497                            assert_ok(transaction_reader.contract_address())
1498                        }
1499                        TransactionField::LogsBloom => assert_ok(transaction_reader.logs_bloom()),
1500                        TransactionField::Type => assert_ok(transaction_reader.type_()),
1501                        TransactionField::Root => assert_ok(transaction_reader.root()),
1502                        TransactionField::Status => assert_ok(transaction_reader.status()),
1503                        TransactionField::Sighash => assert_ok(transaction_reader.sighash()),
1504                        TransactionField::YParity => assert_ok(transaction_reader.y_parity()),
1505                        TransactionField::AccessList => assert_ok(transaction_reader.access_list()),
1506                        TransactionField::AuthorizationList => {
1507                            assert_ok(transaction_reader.authorization_list())
1508                        }
1509                        TransactionField::L1Fee => assert_ok(transaction_reader.l1_fee()),
1510                        TransactionField::MaxFeePerBlobGas => {
1511                            assert_ok(transaction_reader.max_fee_per_blob_gas())
1512                        }
1513                        TransactionField::BlobVersionedHashes => {
1514                            assert_ok(transaction_reader.blob_versioned_hashes())
1515                        }
1516                        TransactionField::L1GasPrice => {
1517                            assert_ok(transaction_reader.l1_gas_price())
1518                        }
1519                        TransactionField::L1GasUsed => assert_ok(transaction_reader.l1_gas_used()),
1520                        TransactionField::L1FeeScalar => {
1521                            assert_ok(transaction_reader.l1_fee_scalar())
1522                        }
1523                        TransactionField::GasUsedForL1 => {
1524                            assert_ok(transaction_reader.gas_used_for_l1())
1525                        }
1526                        TransactionField::BlobGasPrice => {
1527                            assert_ok(transaction_reader.blob_gas_price())
1528                        }
1529                        TransactionField::BlobGasUsed => {
1530                            assert_ok(transaction_reader.blob_gas_used())
1531                        }
1532                        TransactionField::DepositNonce => {
1533                            assert_ok(transaction_reader.deposit_nonce())
1534                        }
1535                        TransactionField::DepositReceiptVersion => {
1536                            assert_ok(transaction_reader.deposit_receipt_version())
1537                        }
1538                        TransactionField::L1BaseFeeScalar => {
1539                            assert_ok(transaction_reader.l1_base_fee_scalar())
1540                        }
1541                        TransactionField::L1BlobBaseFee => {
1542                            assert_ok(transaction_reader.l1_blob_base_fee())
1543                        }
1544                        TransactionField::L1BlobBaseFeeScalar => {
1545                            assert_ok(transaction_reader.l1_blob_base_fee_scalar())
1546                        }
1547                        TransactionField::L1BlockNumber => {
1548                            assert_ok(transaction_reader.l1_block_number())
1549                        }
1550                        TransactionField::Mint => assert_ok(transaction_reader.mint()),
1551                        TransactionField::SourceHash => assert_ok(transaction_reader.source_hash()),
1552                    }
1553                }
1554            }
1555        }
1556
1557        println!("num_transactions: {}", num_transactions);
1558        let mut num_blocks = 0;
1559
1560        for batch in res.data.blocks {
1561            for block_reader in BlockReader::iter(&batch) {
1562                num_blocks += 1;
1563
1564                for block_field in BlockField::all() {
1565                    match block_field {
1566                        BlockField::Number => assert_ok(block_reader.number()),
1567                        BlockField::Hash => assert_ok(block_reader.hash()),
1568                        BlockField::ParentHash => assert_ok(block_reader.parent_hash()),
1569                        BlockField::Nonce => assert_ok(block_reader.nonce()),
1570                        BlockField::Sha3Uncles => assert_ok(block_reader.sha3_uncles()),
1571                        BlockField::LogsBloom => assert_ok(block_reader.logs_bloom()),
1572                        BlockField::TransactionsRoot => assert_ok(block_reader.transactions_root()),
1573                        BlockField::StateRoot => assert_ok(block_reader.state_root()),
1574                        BlockField::ReceiptsRoot => assert_ok(block_reader.receipts_root()),
1575                        BlockField::Miner => assert_ok(block_reader.miner()),
1576                        BlockField::Difficulty => assert_ok(block_reader.difficulty()),
1577                        BlockField::TotalDifficulty => assert_ok(block_reader.total_difficulty()),
1578                        BlockField::ExtraData => assert_ok(block_reader.extra_data()),
1579                        BlockField::Size => assert_ok(block_reader.size()),
1580                        BlockField::GasLimit => assert_ok(block_reader.gas_limit()),
1581                        BlockField::GasUsed => assert_ok(block_reader.gas_used()),
1582                        BlockField::Timestamp => assert_ok(block_reader.timestamp()),
1583                        BlockField::Uncles => assert_ok(block_reader.uncles()),
1584                        BlockField::BaseFeePerGas => assert_ok(block_reader.base_fee_per_gas()),
1585                        BlockField::BlobGasUsed => assert_ok(block_reader.blob_gas_used()),
1586                        BlockField::ExcessBlobGas => assert_ok(block_reader.excess_blob_gas()),
1587                        BlockField::ParentBeaconBlockRoot => {
1588                            assert_ok(block_reader.parent_beacon_block_root())
1589                        }
1590                        BlockField::WithdrawalsRoot => assert_ok(block_reader.withdrawals_root()),
1591                        BlockField::Withdrawals => assert_ok(block_reader.withdrawals()),
1592                        BlockField::L1BlockNumber => assert_ok(block_reader.l1_block_number()),
1593                        BlockField::SendCount => assert_ok(block_reader.send_count()),
1594                        BlockField::SendRoot => assert_ok(block_reader.send_root()),
1595                        BlockField::MixHash => assert_ok(block_reader.mix_hash()),
1596                    }
1597                }
1598            }
1599        }
1600
1601        println!("num_blocks: {}", num_blocks);
1602
1603        let mut num_traces = 0;
1604
1605        for batch in res.data.traces {
1606            for trace_reader in TraceReader::iter(&batch) {
1607                num_traces += 1;
1608
1609                for trace_field in TraceField::all() {
1610                    match trace_field {
1611                        TraceField::BlockHash => assert_ok(trace_reader.block_hash()),
1612                        TraceField::BlockNumber => assert_ok(trace_reader.block_number()),
1613                        TraceField::From => assert_ok(trace_reader.from()),
1614                        TraceField::To => assert_ok(trace_reader.to()),
1615                        TraceField::CallType => assert_ok(trace_reader.call_type()),
1616                        TraceField::Gas => assert_ok(trace_reader.gas()),
1617                        TraceField::Input => assert_ok(trace_reader.input()),
1618                        TraceField::Init => assert_ok(trace_reader.init()),
1619                        TraceField::Value => assert_ok(trace_reader.value()),
1620                        TraceField::Author => assert_ok(trace_reader.author()),
1621                        TraceField::RewardType => assert_ok(trace_reader.reward_type()),
1622                        TraceField::Address => assert_ok(trace_reader.address()),
1623                        TraceField::Code => assert_ok(trace_reader.code()),
1624                        TraceField::GasUsed => assert_ok(trace_reader.gas_used()),
1625                        TraceField::Output => assert_ok(trace_reader.output()),
1626                        TraceField::Subtraces => assert_ok(trace_reader.subtraces()),
1627                        TraceField::TraceAddress => assert_ok(trace_reader.trace_address()),
1628                        TraceField::TransactionHash => assert_ok(trace_reader.transaction_hash()),
1629                        TraceField::TransactionPosition => {
1630                            assert_ok(trace_reader.transaction_position())
1631                        }
1632                        TraceField::Type => assert_ok(trace_reader.type_()),
1633                        TraceField::Error => assert_ok(trace_reader.error()),
1634                        TraceField::Sighash => assert_ok(trace_reader.sighash()),
1635                        TraceField::ActionAddress => assert_ok(trace_reader.action_address()),
1636                        TraceField::Balance => assert_ok(trace_reader.balance()),
1637                        TraceField::RefundAddress => assert_ok(trace_reader.refund_address()),
1638                    }
1639                }
1640            }
1641        }
1642
1643        println!("num_traces: {}", num_traces);
1644
1645        assert!(num_traces > 0, "no traces found");
1646        assert!(num_logs > 0, "no logs found");
1647        assert!(num_transactions > 0, "no transactions found");
1648        assert!(num_blocks > 0, "no blocks found");
1649
1650        Ok(())
1651    }
1652}