hypersync_client/
from_arrow.rs

1use crate::{
2    arrow_reader::{self, BlockReader, LogReader, TraceReader, TransactionReader},
3    simple_types::{Block, Log, Trace, Transaction},
4};
5use anyhow::Context;
6use arrayvec::ArrayVec;
7use arrow::array::RecordBatch;
8
9fn to_opt<T>(val: Result<T, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
10    match val {
11        Ok(val) => Ok(Some(val)),
12        // Only column not found error is valid for flattening to a None as if it
13        // were null since the user can deselect a the column
14        Err(arrow_reader::ReadError::ColumnError(arrow_reader::ColumnError {
15            err: arrow_reader::ColumnErrorType::NotFound,
16            ..
17        })) => Ok(None),
18        // All other errors are unexpected and should be surfaced
19        Err(e) => Err(anyhow::Error::new(e)),
20    }
21}
22
23fn to_nested_opt<T>(val: Result<Option<T>, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
24    match to_opt(val) {
25        Ok(Some(Some(val))) => Ok(Some(val)),
26        Ok(Some(None)) => Ok(None),
27        Ok(None) => Ok(None),
28        Err(e) => Err(e),
29    }
30}
31
32impl TryFrom<LogReader<'_>> for Log {
33    type Error = anyhow::Error;
34
35    fn try_from(reader: LogReader<'_>) -> Result<Self, Self::Error> {
36        let removed = to_nested_opt(reader.removed()).context("read field removed")?;
37        let log_index = to_opt(reader.log_index()).context("read field log_index")?;
38        let transaction_index =
39            to_opt(reader.transaction_index()).context("read field transaction_index")?;
40        let transaction_hash =
41            to_opt(reader.transaction_hash()).context("read field transaction_hash")?;
42        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
43        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
44        let address = to_opt(reader.address()).context("read field address")?;
45        let data = to_opt(reader.data()).context("read field data")?;
46        let mut topics = ArrayVec::new();
47        let topic0 = to_nested_opt(reader.topic0()).context("read field topic0")?;
48        topics.push(topic0);
49        let topic1 = to_nested_opt(reader.topic1()).context("read field topic1")?;
50        topics.push(topic1);
51        let topic2 = to_nested_opt(reader.topic2()).context("read field topic2")?;
52        topics.push(topic2);
53        let topic3 = to_nested_opt(reader.topic3()).context("read field topic3")?;
54        topics.push(topic3);
55        Ok(Self {
56            removed,
57            log_index,
58            transaction_index,
59            transaction_hash,
60            block_hash,
61            block_number,
62            address,
63            data,
64            topics,
65        })
66    }
67}
68
69impl TryFrom<BlockReader<'_>> for Block {
70    type Error = anyhow::Error;
71
72    fn try_from(reader: BlockReader<'_>) -> Result<Self, Self::Error> {
73        let number = to_opt(reader.number()).context("read field number")?;
74        let hash = to_opt(reader.hash()).context("read field hash")?;
75        let parent_hash = to_opt(reader.parent_hash()).context("read field parent_hash")?;
76        let nonce = to_nested_opt(reader.nonce()).context("read field nonce")?;
77        let sha3_uncles = to_opt(reader.sha3_uncles()).context("read field sha3_uncles")?;
78        let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
79        let transactions_root =
80            to_opt(reader.transactions_root()).context("read field transactions_root")?;
81        let state_root = to_opt(reader.state_root()).context("read field state_root")?;
82        let receipts_root = to_opt(reader.receipts_root()).context("read field receipts_root")?;
83        let miner = to_opt(reader.miner()).context("read field miner")?;
84        let difficulty = to_nested_opt(reader.difficulty()).context("read field difficulty")?;
85        let total_difficulty =
86            to_nested_opt(reader.total_difficulty()).context("read field total_difficulty")?;
87        let extra_data = to_opt(reader.extra_data()).context("read field extra_data")?;
88        let size = to_opt(reader.size()).context("read field size")?;
89        let gas_limit = to_opt(reader.gas_limit()).context("read field gas_limit")?;
90        let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
91        let timestamp = to_opt(reader.timestamp()).context("read field timestamp")?;
92        let uncles = to_nested_opt(reader.uncles()).context("read field uncles")?;
93        let base_fee_per_gas =
94            to_nested_opt(reader.base_fee_per_gas()).context("read field base_fee_per_gas")?;
95        let blob_gas_used =
96            to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
97        let excess_blob_gas =
98            to_nested_opt(reader.excess_blob_gas()).context("read field excess_blob_gas")?;
99        let parent_beacon_block_root = to_nested_opt(reader.parent_beacon_block_root())
100            .context("read field parent_beacon_block_root")?;
101        let withdrawals_root =
102            to_nested_opt(reader.withdrawals_root()).context("read field withdrawals_root")?;
103        let withdrawals = to_nested_opt(reader.withdrawals()).context("read field withdrawals")?;
104        let l1_block_number =
105            to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
106        let send_count = to_nested_opt(reader.send_count()).context("read field send_count")?;
107        let send_root = to_nested_opt(reader.send_root()).context("read field send_root")?;
108        let mix_hash = to_nested_opt(reader.mix_hash()).context("read field mix_hash")?;
109
110        Ok(Self {
111            number,
112            hash,
113            parent_hash,
114            nonce,
115            sha3_uncles,
116            logs_bloom,
117            transactions_root,
118            state_root,
119            receipts_root,
120            miner,
121            difficulty,
122            total_difficulty,
123            extra_data,
124            size,
125            gas_limit,
126            gas_used,
127            timestamp,
128            uncles,
129            base_fee_per_gas,
130            blob_gas_used,
131            excess_blob_gas,
132            parent_beacon_block_root,
133            withdrawals_root,
134            withdrawals,
135            l1_block_number,
136            send_count,
137            send_root,
138            mix_hash,
139        })
140    }
141}
142
143impl TryFrom<TransactionReader<'_>> for Transaction {
144    type Error = anyhow::Error;
145
146    fn try_from(reader: TransactionReader<'_>) -> Result<Self, Self::Error> {
147        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
148        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
149        let from = to_nested_opt(reader.from()).context("read field from")?;
150        let gas = to_opt(reader.gas()).context("read field gas")?;
151        let gas_price = to_nested_opt(reader.gas_price()).context("read field gas_price")?;
152        let hash = to_opt(reader.hash()).context("read field hash")?;
153        let input = to_opt(reader.input()).context("read field input")?;
154        let nonce = to_opt(reader.nonce()).context("read field nonce")?;
155        let to = to_nested_opt(reader.to()).context("read field to")?;
156        let transaction_index =
157            to_opt(reader.transaction_index()).context("read field transaction_index")?;
158        let value = to_opt(reader.value()).context("read field value")?;
159        let v = to_nested_opt(reader.v()).context("read field v")?;
160        let r = to_nested_opt(reader.r()).context("read field r")?;
161        let s = to_nested_opt(reader.s()).context("read field s")?;
162        let y_parity = to_nested_opt(reader.y_parity()).context("read field y_parity")?;
163        let max_priority_fee_per_gas = to_nested_opt(reader.max_priority_fee_per_gas())
164            .context("read field max_priority_fee_per_gas")?;
165        let max_fee_per_gas =
166            to_nested_opt(reader.max_fee_per_gas()).context("read field max_fee_per_gas")?;
167        let chain_id = to_nested_opt(reader.chain_id()).context("read field chain_id")?;
168        let access_list = to_nested_opt(reader.access_list()).context("read field access_list")?;
169        let authorization_list =
170            to_nested_opt(reader.authorization_list()).context("read field authorization_list")?;
171        let max_fee_per_blob_gas = to_nested_opt(reader.max_fee_per_blob_gas())
172            .context("read field max_fee_per_blob_gas")?;
173        let blob_versioned_hashes = to_nested_opt(reader.blob_versioned_hashes())
174            .context("read field blob_versioned_hashes")?;
175        let cumulative_gas_used =
176            to_opt(reader.cumulative_gas_used()).context("read field cumulative_gas_used")?;
177        let effective_gas_price =
178            to_opt(reader.effective_gas_price()).context("read field effective_gas_price")?;
179        let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
180        let contract_address =
181            to_nested_opt(reader.contract_address()).context("read field contract_address")?;
182        let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
183        let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
184        let root = to_nested_opt(reader.root()).context("read field root")?;
185        let status = to_nested_opt(reader.status()).context("read field status")?;
186        let l1_fee = to_nested_opt(reader.l1_fee()).context("read field l1_fee")?;
187        let l1_gas_price =
188            to_nested_opt(reader.l1_gas_price()).context("read field l1_gas_price")?;
189        let l1_gas_used = to_nested_opt(reader.l1_gas_used()).context("read field l1_gas_used")?;
190        let l1_fee_scalar =
191            to_nested_opt(reader.l1_fee_scalar()).context("read field l1_fee_scalar")?;
192        let gas_used_for_l1 =
193            to_nested_opt(reader.gas_used_for_l1()).context("read field gas_used_for_l1")?;
194        let blob_gas_price =
195            to_nested_opt(reader.blob_gas_price()).context("read field blob_gas_price")?;
196        let blob_gas_used =
197            to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
198        let deposit_nonce =
199            to_nested_opt(reader.deposit_nonce()).context("read field deposit_nonce")?;
200        let deposit_receipt_version = to_nested_opt(reader.deposit_receipt_version())
201            .context("read field deposit_receipt_version")?;
202        let l1_base_fee_scalar =
203            to_nested_opt(reader.l1_base_fee_scalar()).context("read field l1_base_fee_scalar")?;
204        let l1_blob_base_fee =
205            to_nested_opt(reader.l1_blob_base_fee()).context("read field l1_blob_base_fee")?;
206        let l1_blob_base_fee_scalar = to_nested_opt(reader.l1_blob_base_fee_scalar())
207            .context("read field l1_blob_base_fee_scalar")?;
208        let l1_block_number =
209            to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
210        let mint = to_nested_opt(reader.mint()).context("read field mint")?;
211        let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
212        let source_hash = to_nested_opt(reader.source_hash()).context("read field source_hash")?;
213
214        Ok(Self {
215            block_hash,
216            block_number,
217            from,
218            gas,
219            gas_price,
220            hash,
221            input,
222            nonce,
223            to,
224            transaction_index,
225            value,
226            v,
227            r,
228            s,
229            y_parity,
230            max_priority_fee_per_gas,
231            max_fee_per_gas,
232            chain_id,
233            access_list,
234            authorization_list,
235            max_fee_per_blob_gas,
236            blob_versioned_hashes,
237            cumulative_gas_used,
238            effective_gas_price,
239            gas_used,
240            contract_address,
241            logs_bloom,
242            type_,
243            root,
244            status,
245            l1_fee,
246            l1_gas_price,
247            l1_gas_used,
248            l1_fee_scalar,
249            gas_used_for_l1,
250            blob_gas_price,
251            blob_gas_used,
252            deposit_nonce,
253            deposit_receipt_version,
254            l1_base_fee_scalar,
255            l1_blob_base_fee,
256            l1_blob_base_fee_scalar,
257            l1_block_number,
258            mint,
259            sighash,
260            source_hash,
261        })
262    }
263}
264
265impl Block {
266    /// Convert an arrow RecordBatch into a vector of Block structs
267    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
268        let mut blocks = Vec::new();
269        for block_reader in BlockReader::iter(batch) {
270            blocks.push(
271                block_reader
272                    .try_into()
273                    .context("convert block reader to block")?,
274            );
275        }
276        Ok(blocks)
277    }
278}
279
280impl Transaction {
281    /// Convert an arrow RecordBatch into a vector of Transaction structs
282    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
283        let mut transactions = Vec::new();
284        for transaction_reader in TransactionReader::iter(batch) {
285            transactions.push(
286                transaction_reader
287                    .try_into()
288                    .context("convert transaction reader to transaction")?,
289            );
290        }
291        Ok(transactions)
292    }
293}
294
295impl TryFrom<TraceReader<'_>> for Trace {
296    type Error = anyhow::Error;
297
298    fn try_from(reader: TraceReader<'_>) -> Result<Self, Self::Error> {
299        let from = to_nested_opt(reader.from()).context("read field from")?;
300        let to = to_nested_opt(reader.to()).context("read field to")?;
301        let call_type = to_nested_opt(reader.call_type()).context("read field call_type")?;
302        let gas = to_nested_opt(reader.gas()).context("read field gas")?;
303        let input = to_nested_opt(reader.input()).context("read field input")?;
304        let init = to_nested_opt(reader.init()).context("read field init")?;
305        let value = to_nested_opt(reader.value()).context("read field value")?;
306        let author = to_nested_opt(reader.author()).context("read field author")?;
307        let reward_type = to_nested_opt(reader.reward_type()).context("read field reward_type")?;
308        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
309        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
310        let address = to_nested_opt(reader.address()).context("read field address")?;
311        let code = to_nested_opt(reader.code()).context("read field code")?;
312        let gas_used = to_nested_opt(reader.gas_used()).context("read field gas_used")?;
313        let output = to_nested_opt(reader.output()).context("read field output")?;
314        let subtraces = to_nested_opt(reader.subtraces()).context("read field subtraces")?;
315        let trace_address =
316            to_nested_opt(reader.trace_address()).context("read field trace_address")?;
317        let transaction_hash =
318            to_nested_opt(reader.transaction_hash()).context("read field transaction_hash")?;
319        let transaction_position = to_nested_opt(reader.transaction_position())
320            .context("read field transaction_position")?;
321        let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
322        let error = to_nested_opt(reader.error()).context("read field error")?;
323        let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
324        let action_address =
325            to_nested_opt(reader.action_address()).context("read field action_address")?;
326        let balance = to_nested_opt(reader.balance()).context("read field balance")?;
327        let refund_address =
328            to_nested_opt(reader.refund_address()).context("read field refund_address")?;
329
330        Ok(Self {
331            from,
332            to,
333            call_type,
334            gas,
335            input,
336            init,
337            value,
338            author,
339            reward_type,
340            block_hash,
341            block_number,
342            address,
343            code,
344            gas_used,
345            output,
346            subtraces,
347            trace_address,
348            transaction_hash,
349            transaction_position,
350            type_,
351            error,
352            sighash,
353            action_address,
354            balance,
355            refund_address,
356        })
357    }
358}
359
360impl Log {
361    /// Convert an arrow RecordBatch into a vector of Log structs
362    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
363        let mut logs = Vec::new();
364        for log_reader in LogReader::iter(batch) {
365            logs.push(log_reader.try_into().context("convert log reader to log")?);
366        }
367        Ok(logs)
368    }
369}
370
371impl Trace {
372    /// Convert an arrow RecordBatch into a vector of Trace structs
373    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
374        let mut traces = Vec::new();
375        for trace_reader in TraceReader::iter(batch) {
376            traces.push(
377                trace_reader
378                    .try_into()
379                    .context("convert trace reader to trace")?,
380            );
381        }
382        Ok(traces)
383    }
384}