Skip to main content

hypersync_client/
from_arrow.rs

1use crate::{
2    arrow_reader::{self, BlockReader, LogReader, TraceReader, TransactionReader},
3    simple_types::{Block, Log, Trace, Transaction},
4};
5use anyhow::Context;
6use arrayvec::ArrayVec;
7use arrow::array::RecordBatch;
8
9fn to_opt<T>(val: Result<T, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
10    match val {
11        Ok(val) => Ok(Some(val)),
12        // Only column not found error is valid for flattening to a None as if it
13        // were null since the user can deselect a the column
14        Err(arrow_reader::ReadError::ColumnError(arrow_reader::ColumnError {
15            err: arrow_reader::ColumnErrorType::NotFound,
16            ..
17        })) => Ok(None),
18        // All other errors are unexpected and should be surfaced
19        Err(e) => Err(anyhow::Error::new(e)),
20    }
21}
22
23fn to_nested_opt<T>(val: Result<Option<T>, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
24    match to_opt(val) {
25        Ok(Some(Some(val))) => Ok(Some(val)),
26        Ok(Some(None)) => Ok(None),
27        Ok(None) => Ok(None),
28        Err(e) => Err(e),
29    }
30}
31
32impl TryFrom<LogReader<'_>> for Log {
33    type Error = anyhow::Error;
34
35    fn try_from(reader: LogReader<'_>) -> Result<Self, Self::Error> {
36        let removed = to_nested_opt(reader.removed()).context("read field removed")?;
37        let log_index = to_opt(reader.log_index()).context("read field log_index")?;
38        let transaction_index =
39            to_opt(reader.transaction_index()).context("read field transaction_index")?;
40        let transaction_hash =
41            to_opt(reader.transaction_hash()).context("read field transaction_hash")?;
42        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
43        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
44        let address = to_opt(reader.address()).context("read field address")?;
45        let data = to_opt(reader.data()).context("read field data")?;
46        let mut topics = ArrayVec::new();
47        let topic0 = to_nested_opt(reader.topic0()).context("read field topic0")?;
48        topics.push(topic0);
49        let topic1 = to_nested_opt(reader.topic1()).context("read field topic1")?;
50        topics.push(topic1);
51        let topic2 = to_nested_opt(reader.topic2()).context("read field topic2")?;
52        topics.push(topic2);
53        let topic3 = to_nested_opt(reader.topic3()).context("read field topic3")?;
54        topics.push(topic3);
55        Ok(Self {
56            removed,
57            log_index,
58            transaction_index,
59            transaction_hash,
60            block_hash,
61            block_number,
62            address,
63            data,
64            topics,
65        })
66    }
67}
68
69impl TryFrom<BlockReader<'_>> for Block {
70    type Error = anyhow::Error;
71
72    fn try_from(reader: BlockReader<'_>) -> Result<Self, Self::Error> {
73        let number = to_opt(reader.number()).context("read field number")?;
74        let hash = to_opt(reader.hash()).context("read field hash")?;
75        let parent_hash = to_opt(reader.parent_hash()).context("read field parent_hash")?;
76        let nonce = to_nested_opt(reader.nonce()).context("read field nonce")?;
77        let sha3_uncles = to_opt(reader.sha3_uncles()).context("read field sha3_uncles")?;
78        let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
79        let transactions_root =
80            to_opt(reader.transactions_root()).context("read field transactions_root")?;
81        let state_root = to_opt(reader.state_root()).context("read field state_root")?;
82        let receipts_root = to_opt(reader.receipts_root()).context("read field receipts_root")?;
83        let miner = to_opt(reader.miner()).context("read field miner")?;
84        let difficulty = to_nested_opt(reader.difficulty()).context("read field difficulty")?;
85        let total_difficulty =
86            to_nested_opt(reader.total_difficulty()).context("read field total_difficulty")?;
87        let extra_data = to_opt(reader.extra_data()).context("read field extra_data")?;
88        let size = to_opt(reader.size()).context("read field size")?;
89        let gas_limit = to_opt(reader.gas_limit()).context("read field gas_limit")?;
90        let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
91        let timestamp = to_opt(reader.timestamp()).context("read field timestamp")?;
92        let uncles = to_nested_opt(reader.uncles()).context("read field uncles")?;
93        let base_fee_per_gas =
94            to_nested_opt(reader.base_fee_per_gas()).context("read field base_fee_per_gas")?;
95        let blob_gas_used =
96            to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
97        let excess_blob_gas =
98            to_nested_opt(reader.excess_blob_gas()).context("read field excess_blob_gas")?;
99        let parent_beacon_block_root = to_nested_opt(reader.parent_beacon_block_root())
100            .context("read field parent_beacon_block_root")?;
101        let withdrawals_root =
102            to_nested_opt(reader.withdrawals_root()).context("read field withdrawals_root")?;
103        let withdrawals = to_nested_opt(reader.withdrawals()).context("read field withdrawals")?;
104        let l1_block_number =
105            to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
106        let send_count = to_nested_opt(reader.send_count()).context("read field send_count")?;
107        let send_root = to_nested_opt(reader.send_root()).context("read field send_root")?;
108        let mix_hash = to_nested_opt(reader.mix_hash()).context("read field mix_hash")?;
109
110        Ok(Self {
111            number,
112            hash,
113            parent_hash,
114            nonce,
115            sha3_uncles,
116            logs_bloom,
117            transactions_root,
118            state_root,
119            receipts_root,
120            miner,
121            difficulty,
122            total_difficulty,
123            extra_data,
124            size,
125            gas_limit,
126            gas_used,
127            timestamp,
128            uncles,
129            base_fee_per_gas,
130            blob_gas_used,
131            excess_blob_gas,
132            parent_beacon_block_root,
133            withdrawals_root,
134            withdrawals,
135            l1_block_number,
136            send_count,
137            send_root,
138            mix_hash,
139        })
140    }
141}
142
143impl TryFrom<TransactionReader<'_>> for Transaction {
144    type Error = anyhow::Error;
145
146    fn try_from(reader: TransactionReader<'_>) -> Result<Self, Self::Error> {
147        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
148        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
149        let from = to_nested_opt(reader.from()).context("read field from")?;
150        let gas = to_opt(reader.gas()).context("read field gas")?;
151        let gas_price = to_nested_opt(reader.gas_price()).context("read field gas_price")?;
152        let hash = to_opt(reader.hash()).context("read field hash")?;
153        let input = to_opt(reader.input()).context("read field input")?;
154        let nonce = to_opt(reader.nonce()).context("read field nonce")?;
155        let to = to_nested_opt(reader.to()).context("read field to")?;
156        let transaction_index =
157            to_opt(reader.transaction_index()).context("read field transaction_index")?;
158        let value = to_opt(reader.value()).context("read field value")?;
159        let v = to_nested_opt(reader.v()).context("read field v")?;
160        let r = to_nested_opt(reader.r()).context("read field r")?;
161        let s = to_nested_opt(reader.s()).context("read field s")?;
162        let y_parity = to_nested_opt(reader.y_parity()).context("read field y_parity")?;
163        let max_priority_fee_per_gas = to_nested_opt(reader.max_priority_fee_per_gas())
164            .context("read field max_priority_fee_per_gas")?;
165        let max_fee_per_gas =
166            to_nested_opt(reader.max_fee_per_gas()).context("read field max_fee_per_gas")?;
167        let chain_id = to_nested_opt(reader.chain_id()).context("read field chain_id")?;
168        let access_list = to_nested_opt(reader.access_list()).context("read field access_list")?;
169        let authorization_list =
170            to_nested_opt(reader.authorization_list()).context("read field authorization_list")?;
171        let max_fee_per_blob_gas = to_nested_opt(reader.max_fee_per_blob_gas())
172            .context("read field max_fee_per_blob_gas")?;
173        let blob_versioned_hashes = to_nested_opt(reader.blob_versioned_hashes())
174            .context("read field blob_versioned_hashes")?;
175        let cumulative_gas_used =
176            to_opt(reader.cumulative_gas_used()).context("read field cumulative_gas_used")?;
177        let effective_gas_price =
178            to_opt(reader.effective_gas_price()).context("read field effective_gas_price")?;
179        let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
180        let contract_address =
181            to_nested_opt(reader.contract_address()).context("read field contract_address")?;
182        let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
183        let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
184        let root = to_nested_opt(reader.root()).context("read field root")?;
185        let status = to_nested_opt(reader.status()).context("read field status")?;
186        let l1_fee = to_nested_opt(reader.l1_fee()).context("read field l1_fee")?;
187        let l1_gas_price =
188            to_nested_opt(reader.l1_gas_price()).context("read field l1_gas_price")?;
189        let l1_gas_used = to_nested_opt(reader.l1_gas_used()).context("read field l1_gas_used")?;
190        let l1_fee_scalar =
191            to_nested_opt(reader.l1_fee_scalar()).context("read field l1_fee_scalar")?;
192        let gas_used_for_l1 =
193            to_nested_opt(reader.gas_used_for_l1()).context("read field gas_used_for_l1")?;
194        let blob_gas_price =
195            to_nested_opt(reader.blob_gas_price()).context("read field blob_gas_price")?;
196        let blob_gas_used =
197            to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
198        let deposit_nonce =
199            to_nested_opt(reader.deposit_nonce()).context("read field deposit_nonce")?;
200        let deposit_receipt_version = to_nested_opt(reader.deposit_receipt_version())
201            .context("read field deposit_receipt_version")?;
202        let l1_base_fee_scalar =
203            to_nested_opt(reader.l1_base_fee_scalar()).context("read field l1_base_fee_scalar")?;
204        let l1_blob_base_fee =
205            to_nested_opt(reader.l1_blob_base_fee()).context("read field l1_blob_base_fee")?;
206        let l1_blob_base_fee_scalar = to_nested_opt(reader.l1_blob_base_fee_scalar())
207            .context("read field l1_blob_base_fee_scalar")?;
208        let l1_block_number =
209            to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
210        let mint = to_nested_opt(reader.mint()).context("read field mint")?;
211        let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
212        let source_hash = to_nested_opt(reader.source_hash()).context("read field source_hash")?;
213        let request_id = to_nested_opt(reader.request_id()).context("read field request_id")?;
214        let ticket_id = to_nested_opt(reader.ticket_id()).context("read field ticket_id")?;
215        let refund_to = to_nested_opt(reader.refund_to()).context("read field refund_to")?;
216        let max_refund = to_nested_opt(reader.max_refund()).context("read field max_refund")?;
217        let submission_fee_refund = to_nested_opt(reader.submission_fee_refund())
218            .context("read field submission_fee_refund")?;
219        let l1_base_fee = to_nested_opt(reader.l1_base_fee()).context("read field l1_base_fee")?;
220        let deposit_value =
221            to_nested_opt(reader.deposit_value()).context("read field deposit_value")?;
222        let retry_to = to_nested_opt(reader.retry_to()).context("read field retry_to")?;
223        let retry_value = to_nested_opt(reader.retry_value()).context("read field retry_value")?;
224        let retry_data = to_nested_opt(reader.retry_data()).context("read field retry_data")?;
225        let beneficiary = to_nested_opt(reader.beneficiary()).context("read field beneficiary")?;
226        let max_submission_fee =
227            to_nested_opt(reader.max_submission_fee()).context("read field max_submission_fee")?;
228
229        Ok(Self {
230            block_hash,
231            block_number,
232            from,
233            gas,
234            gas_price,
235            hash,
236            input,
237            nonce,
238            to,
239            transaction_index,
240            value,
241            v,
242            r,
243            s,
244            y_parity,
245            max_priority_fee_per_gas,
246            max_fee_per_gas,
247            chain_id,
248            access_list,
249            authorization_list,
250            max_fee_per_blob_gas,
251            blob_versioned_hashes,
252            cumulative_gas_used,
253            effective_gas_price,
254            gas_used,
255            contract_address,
256            logs_bloom,
257            type_,
258            root,
259            status,
260            l1_fee,
261            l1_gas_price,
262            l1_gas_used,
263            l1_fee_scalar,
264            gas_used_for_l1,
265            blob_gas_price,
266            blob_gas_used,
267            deposit_nonce,
268            deposit_receipt_version,
269            l1_base_fee_scalar,
270            l1_blob_base_fee,
271            l1_blob_base_fee_scalar,
272            l1_block_number,
273            mint,
274            sighash,
275            source_hash,
276            request_id,
277            ticket_id,
278            refund_to,
279            max_refund,
280            submission_fee_refund,
281            l1_base_fee,
282            deposit_value,
283            retry_to,
284            retry_value,
285            retry_data,
286            beneficiary,
287            max_submission_fee,
288        })
289    }
290}
291
292impl Block {
293    /// Convert an arrow RecordBatch into a vector of Block structs
294    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
295        let mut blocks = Vec::new();
296        for block_reader in BlockReader::iter(batch) {
297            blocks.push(
298                block_reader
299                    .try_into()
300                    .context("convert block reader to block")?,
301            );
302        }
303        Ok(blocks)
304    }
305}
306
307impl Transaction {
308    /// Convert an arrow RecordBatch into a vector of Transaction structs
309    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
310        let mut transactions = Vec::new();
311        for transaction_reader in TransactionReader::iter(batch) {
312            transactions.push(
313                transaction_reader
314                    .try_into()
315                    .context("convert transaction reader to transaction")?,
316            );
317        }
318        Ok(transactions)
319    }
320}
321
322impl TryFrom<TraceReader<'_>> for Trace {
323    type Error = anyhow::Error;
324
325    fn try_from(reader: TraceReader<'_>) -> Result<Self, Self::Error> {
326        let from = to_nested_opt(reader.from()).context("read field from")?;
327        let to = to_nested_opt(reader.to()).context("read field to")?;
328        let call_type = to_nested_opt(reader.call_type()).context("read field call_type")?;
329        let gas = to_nested_opt(reader.gas()).context("read field gas")?;
330        let input = to_nested_opt(reader.input()).context("read field input")?;
331        let init = to_nested_opt(reader.init()).context("read field init")?;
332        let value = to_nested_opt(reader.value()).context("read field value")?;
333        let author = to_nested_opt(reader.author()).context("read field author")?;
334        let reward_type = to_nested_opt(reader.reward_type()).context("read field reward_type")?;
335        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
336        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
337        let address = to_nested_opt(reader.address()).context("read field address")?;
338        let code = to_nested_opt(reader.code()).context("read field code")?;
339        let gas_used = to_nested_opt(reader.gas_used()).context("read field gas_used")?;
340        let output = to_nested_opt(reader.output()).context("read field output")?;
341        let subtraces = to_nested_opt(reader.subtraces()).context("read field subtraces")?;
342        let trace_address =
343            to_nested_opt(reader.trace_address()).context("read field trace_address")?;
344        let transaction_hash =
345            to_nested_opt(reader.transaction_hash()).context("read field transaction_hash")?;
346        let transaction_position = to_nested_opt(reader.transaction_position())
347            .context("read field transaction_position")?;
348        let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
349        let error = to_nested_opt(reader.error()).context("read field error")?;
350        let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
351        let action_address =
352            to_nested_opt(reader.action_address()).context("read field action_address")?;
353        let balance = to_nested_opt(reader.balance()).context("read field balance")?;
354        let refund_address =
355            to_nested_opt(reader.refund_address()).context("read field refund_address")?;
356
357        Ok(Self {
358            from,
359            to,
360            call_type,
361            gas,
362            input,
363            init,
364            value,
365            author,
366            reward_type,
367            block_hash,
368            block_number,
369            address,
370            code,
371            gas_used,
372            output,
373            subtraces,
374            trace_address,
375            transaction_hash,
376            transaction_position,
377            type_,
378            error,
379            sighash,
380            action_address,
381            balance,
382            refund_address,
383        })
384    }
385}
386
387impl Log {
388    /// Convert an arrow RecordBatch into a vector of Log structs
389    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
390        let mut logs = Vec::new();
391        for log_reader in LogReader::iter(batch) {
392            logs.push(log_reader.try_into().context("convert log reader to log")?);
393        }
394        Ok(logs)
395    }
396}
397
398impl Trace {
399    /// Convert an arrow RecordBatch into a vector of Trace structs
400    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
401        let mut traces = Vec::new();
402        for trace_reader in TraceReader::iter(batch) {
403            traces.push(
404                trace_reader
405                    .try_into()
406                    .context("convert trace reader to trace")?,
407            );
408        }
409        Ok(traces)
410    }
411}