Skip to main content

hypersync_client/
from_arrow.rs

1use crate::{
2    arrow_reader::{self, BlockReader, LogReader, TraceReader, TransactionReader},
3    simple_types::{Block, Log, Trace, Transaction},
4};
5use anyhow::Context;
6use arrayvec::ArrayVec;
7use arrow::array::RecordBatch;
8
9fn to_opt<T>(val: Result<T, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
10    match val {
11        Ok(val) => Ok(Some(val)),
12        // Only column not found error is valid for flattening to a None as if it
13        // were null since the user can deselect a the column
14        Err(arrow_reader::ReadError::ColumnError(arrow_reader::ColumnError {
15            err: arrow_reader::ColumnErrorType::NotFound,
16            ..
17        })) => Ok(None),
18        // All other errors are unexpected and should be surfaced
19        Err(e) => Err(anyhow::Error::new(e)),
20    }
21}
22
23fn to_nested_opt<T>(val: Result<Option<T>, arrow_reader::ReadError>) -> anyhow::Result<Option<T>> {
24    match to_opt(val) {
25        Ok(Some(Some(val))) => Ok(Some(val)),
26        Ok(Some(None)) => Ok(None),
27        Ok(None) => Ok(None),
28        Err(e) => Err(e),
29    }
30}
31
32impl TryFrom<LogReader<'_>> for Log {
33    type Error = anyhow::Error;
34    fn try_from(reader: LogReader<'_>) -> Result<Self, Self::Error> {
35        Log::try_from(&reader)
36    }
37}
38
39impl<'a> TryFrom<&'a LogReader<'_>> for Log {
40    type Error = anyhow::Error;
41
42    fn try_from(reader: &'a LogReader<'_>) -> Result<Self, Self::Error> {
43        let removed = to_nested_opt(reader.removed()).context("read field removed")?;
44        let log_index = to_opt(reader.log_index()).context("read field log_index")?;
45        let transaction_index =
46            to_opt(reader.transaction_index()).context("read field transaction_index")?;
47        let transaction_hash =
48            to_opt(reader.transaction_hash()).context("read field transaction_hash")?;
49        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
50        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
51        let address = to_opt(reader.address()).context("read field address")?;
52        let data = to_opt(reader.data()).context("read field data")?;
53        let mut topics = ArrayVec::new();
54        let topic0 = to_nested_opt(reader.topic0()).context("read field topic0")?;
55        topics.push(topic0);
56        let topic1 = to_nested_opt(reader.topic1()).context("read field topic1")?;
57        topics.push(topic1);
58        let topic2 = to_nested_opt(reader.topic2()).context("read field topic2")?;
59        topics.push(topic2);
60        let topic3 = to_nested_opt(reader.topic3()).context("read field topic3")?;
61        topics.push(topic3);
62        Ok(Self {
63            removed,
64            log_index,
65            transaction_index,
66            transaction_hash,
67            block_hash,
68            block_number,
69            address,
70            data,
71            topics,
72        })
73    }
74}
75
76impl TryFrom<BlockReader<'_>> for Block {
77    type Error = anyhow::Error;
78    fn try_from(reader: BlockReader<'_>) -> Result<Self, Self::Error> {
79        Block::try_from(&reader)
80    }
81}
82
83impl<'a> TryFrom<&'a BlockReader<'_>> for Block {
84    type Error = anyhow::Error;
85
86    fn try_from(reader: &'a BlockReader<'_>) -> Result<Self, Self::Error> {
87        let number = to_opt(reader.number()).context("read field number")?;
88        let hash = to_opt(reader.hash()).context("read field hash")?;
89        let parent_hash = to_opt(reader.parent_hash()).context("read field parent_hash")?;
90        let nonce = to_nested_opt(reader.nonce()).context("read field nonce")?;
91        let sha3_uncles = to_opt(reader.sha3_uncles()).context("read field sha3_uncles")?;
92        let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
93        let transactions_root =
94            to_opt(reader.transactions_root()).context("read field transactions_root")?;
95        let state_root = to_opt(reader.state_root()).context("read field state_root")?;
96        let receipts_root = to_opt(reader.receipts_root()).context("read field receipts_root")?;
97        let miner = to_opt(reader.miner()).context("read field miner")?;
98        let difficulty = to_nested_opt(reader.difficulty()).context("read field difficulty")?;
99        let total_difficulty =
100            to_nested_opt(reader.total_difficulty()).context("read field total_difficulty")?;
101        let extra_data = to_opt(reader.extra_data()).context("read field extra_data")?;
102        let size = to_opt(reader.size()).context("read field size")?;
103        let gas_limit = to_opt(reader.gas_limit()).context("read field gas_limit")?;
104        let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
105        let timestamp = to_opt(reader.timestamp()).context("read field timestamp")?;
106        let uncles = to_nested_opt(reader.uncles()).context("read field uncles")?;
107        let base_fee_per_gas =
108            to_nested_opt(reader.base_fee_per_gas()).context("read field base_fee_per_gas")?;
109        let blob_gas_used =
110            to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
111        let excess_blob_gas =
112            to_nested_opt(reader.excess_blob_gas()).context("read field excess_blob_gas")?;
113        let parent_beacon_block_root = to_nested_opt(reader.parent_beacon_block_root())
114            .context("read field parent_beacon_block_root")?;
115        let withdrawals_root =
116            to_nested_opt(reader.withdrawals_root()).context("read field withdrawals_root")?;
117        let withdrawals = to_nested_opt(reader.withdrawals()).context("read field withdrawals")?;
118        let l1_block_number =
119            to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
120        let send_count = to_nested_opt(reader.send_count()).context("read field send_count")?;
121        let send_root = to_nested_opt(reader.send_root()).context("read field send_root")?;
122        let mix_hash = to_nested_opt(reader.mix_hash()).context("read field mix_hash")?;
123
124        Ok(Self {
125            number,
126            hash,
127            parent_hash,
128            nonce,
129            sha3_uncles,
130            logs_bloom,
131            transactions_root,
132            state_root,
133            receipts_root,
134            miner,
135            difficulty,
136            total_difficulty,
137            extra_data,
138            size,
139            gas_limit,
140            gas_used,
141            timestamp,
142            uncles,
143            base_fee_per_gas,
144            blob_gas_used,
145            excess_blob_gas,
146            parent_beacon_block_root,
147            withdrawals_root,
148            withdrawals,
149            l1_block_number,
150            send_count,
151            send_root,
152            mix_hash,
153        })
154    }
155}
156
157impl TryFrom<TransactionReader<'_>> for Transaction {
158    type Error = anyhow::Error;
159    fn try_from(reader: TransactionReader<'_>) -> Result<Self, Self::Error> {
160        Transaction::try_from(&reader)
161    }
162}
163
164impl<'a> TryFrom<&'a TransactionReader<'_>> for Transaction {
165    type Error = anyhow::Error;
166
167    fn try_from(reader: &'a TransactionReader<'_>) -> Result<Self, Self::Error> {
168        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
169        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
170        let from = to_nested_opt(reader.from()).context("read field from")?;
171        let gas = to_opt(reader.gas()).context("read field gas")?;
172        let gas_price = to_nested_opt(reader.gas_price()).context("read field gas_price")?;
173        let hash = to_opt(reader.hash()).context("read field hash")?;
174        let input = to_opt(reader.input()).context("read field input")?;
175        let nonce = to_opt(reader.nonce()).context("read field nonce")?;
176        let to = to_nested_opt(reader.to()).context("read field to")?;
177        let transaction_index =
178            to_opt(reader.transaction_index()).context("read field transaction_index")?;
179        let value = to_opt(reader.value()).context("read field value")?;
180        let v = to_nested_opt(reader.v()).context("read field v")?;
181        let r = to_nested_opt(reader.r()).context("read field r")?;
182        let s = to_nested_opt(reader.s()).context("read field s")?;
183        let y_parity = to_nested_opt(reader.y_parity()).context("read field y_parity")?;
184        let max_priority_fee_per_gas = to_nested_opt(reader.max_priority_fee_per_gas())
185            .context("read field max_priority_fee_per_gas")?;
186        let max_fee_per_gas =
187            to_nested_opt(reader.max_fee_per_gas()).context("read field max_fee_per_gas")?;
188        let chain_id = to_nested_opt(reader.chain_id()).context("read field chain_id")?;
189        let access_list = to_nested_opt(reader.access_list()).context("read field access_list")?;
190        let authorization_list =
191            to_nested_opt(reader.authorization_list()).context("read field authorization_list")?;
192        let max_fee_per_blob_gas = to_nested_opt(reader.max_fee_per_blob_gas())
193            .context("read field max_fee_per_blob_gas")?;
194        let blob_versioned_hashes = to_nested_opt(reader.blob_versioned_hashes())
195            .context("read field blob_versioned_hashes")?;
196        let cumulative_gas_used =
197            to_opt(reader.cumulative_gas_used()).context("read field cumulative_gas_used")?;
198        let effective_gas_price =
199            to_opt(reader.effective_gas_price()).context("read field effective_gas_price")?;
200        let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?;
201        let contract_address =
202            to_nested_opt(reader.contract_address()).context("read field contract_address")?;
203        let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?;
204        let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
205        let root = to_nested_opt(reader.root()).context("read field root")?;
206        let status = to_nested_opt(reader.status()).context("read field status")?;
207        let l1_fee = to_nested_opt(reader.l1_fee()).context("read field l1_fee")?;
208        let l1_gas_price =
209            to_nested_opt(reader.l1_gas_price()).context("read field l1_gas_price")?;
210        let l1_gas_used = to_nested_opt(reader.l1_gas_used()).context("read field l1_gas_used")?;
211        let l1_fee_scalar =
212            to_nested_opt(reader.l1_fee_scalar()).context("read field l1_fee_scalar")?;
213        let gas_used_for_l1 =
214            to_nested_opt(reader.gas_used_for_l1()).context("read field gas_used_for_l1")?;
215        let blob_gas_price =
216            to_nested_opt(reader.blob_gas_price()).context("read field blob_gas_price")?;
217        let blob_gas_used =
218            to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?;
219        let deposit_nonce =
220            to_nested_opt(reader.deposit_nonce()).context("read field deposit_nonce")?;
221        let deposit_receipt_version = to_nested_opt(reader.deposit_receipt_version())
222            .context("read field deposit_receipt_version")?;
223        let l1_base_fee_scalar =
224            to_nested_opt(reader.l1_base_fee_scalar()).context("read field l1_base_fee_scalar")?;
225        let l1_blob_base_fee =
226            to_nested_opt(reader.l1_blob_base_fee()).context("read field l1_blob_base_fee")?;
227        let l1_blob_base_fee_scalar = to_nested_opt(reader.l1_blob_base_fee_scalar())
228            .context("read field l1_blob_base_fee_scalar")?;
229        let l1_block_number =
230            to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?;
231        let mint = to_nested_opt(reader.mint()).context("read field mint")?;
232        let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
233        let source_hash = to_nested_opt(reader.source_hash()).context("read field source_hash")?;
234
235        Ok(Self {
236            block_hash,
237            block_number,
238            from,
239            gas,
240            gas_price,
241            hash,
242            input,
243            nonce,
244            to,
245            transaction_index,
246            value,
247            v,
248            r,
249            s,
250            y_parity,
251            max_priority_fee_per_gas,
252            max_fee_per_gas,
253            chain_id,
254            access_list,
255            authorization_list,
256            max_fee_per_blob_gas,
257            blob_versioned_hashes,
258            cumulative_gas_used,
259            effective_gas_price,
260            gas_used,
261            contract_address,
262            logs_bloom,
263            type_,
264            root,
265            status,
266            l1_fee,
267            l1_gas_price,
268            l1_gas_used,
269            l1_fee_scalar,
270            gas_used_for_l1,
271            blob_gas_price,
272            blob_gas_used,
273            deposit_nonce,
274            deposit_receipt_version,
275            l1_base_fee_scalar,
276            l1_blob_base_fee,
277            l1_blob_base_fee_scalar,
278            l1_block_number,
279            mint,
280            sighash,
281            source_hash,
282        })
283    }
284}
285
286impl Block {
287    /// Convert an arrow RecordBatch into a vector of Block structs
288    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
289        let mut blocks = Vec::new();
290        for block_reader in BlockReader::iter(batch) {
291            blocks.push(
292                block_reader
293                    .try_into()
294                    .context("convert block reader to block")?,
295            );
296        }
297        Ok(blocks)
298    }
299}
300
301impl Transaction {
302    /// Convert an arrow RecordBatch into a vector of Transaction structs
303    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
304        let mut transactions = Vec::new();
305        for transaction_reader in TransactionReader::iter(batch) {
306            transactions.push(
307                transaction_reader
308                    .try_into()
309                    .context("convert transaction reader to transaction")?,
310            );
311        }
312        Ok(transactions)
313    }
314}
315
316impl TryFrom<TraceReader<'_>> for Trace {
317    type Error = anyhow::Error;
318    fn try_from(reader: TraceReader<'_>) -> Result<Self, Self::Error> {
319        Trace::try_from(&reader)
320    }
321}
322
323impl<'a> TryFrom<&'a TraceReader<'_>> for Trace {
324    type Error = anyhow::Error;
325
326    fn try_from(reader: &'a TraceReader<'_>) -> Result<Self, Self::Error> {
327        let from = to_nested_opt(reader.from()).context("read field from")?;
328        let to = to_nested_opt(reader.to()).context("read field to")?;
329        let call_type = to_nested_opt(reader.call_type()).context("read field call_type")?;
330        let gas = to_nested_opt(reader.gas()).context("read field gas")?;
331        let input = to_nested_opt(reader.input()).context("read field input")?;
332        let init = to_nested_opt(reader.init()).context("read field init")?;
333        let value = to_nested_opt(reader.value()).context("read field value")?;
334        let author = to_nested_opt(reader.author()).context("read field author")?;
335        let reward_type = to_nested_opt(reader.reward_type()).context("read field reward_type")?;
336        let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?;
337        let block_number = to_opt(reader.block_number()).context("read field block_number")?;
338        let address = to_nested_opt(reader.address()).context("read field address")?;
339        let code = to_nested_opt(reader.code()).context("read field code")?;
340        let gas_used = to_nested_opt(reader.gas_used()).context("read field gas_used")?;
341        let output = to_nested_opt(reader.output()).context("read field output")?;
342        let subtraces = to_nested_opt(reader.subtraces()).context("read field subtraces")?;
343        let trace_address =
344            to_nested_opt(reader.trace_address()).context("read field trace_address")?;
345        let transaction_hash =
346            to_nested_opt(reader.transaction_hash()).context("read field transaction_hash")?;
347        let transaction_position = to_nested_opt(reader.transaction_position())
348            .context("read field transaction_position")?;
349        let type_ = to_nested_opt(reader.type_()).context("read field type_")?;
350        let error = to_nested_opt(reader.error()).context("read field error")?;
351        let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?;
352        let action_address =
353            to_nested_opt(reader.action_address()).context("read field action_address")?;
354        let balance = to_nested_opt(reader.balance()).context("read field balance")?;
355        let refund_address =
356            to_nested_opt(reader.refund_address()).context("read field refund_address")?;
357
358        Ok(Self {
359            from,
360            to,
361            call_type,
362            gas,
363            input,
364            init,
365            value,
366            author,
367            reward_type,
368            block_hash,
369            block_number,
370            address,
371            code,
372            gas_used,
373            output,
374            subtraces,
375            trace_address,
376            transaction_hash,
377            transaction_position,
378            type_,
379            error,
380            sighash,
381            action_address,
382            balance,
383            refund_address,
384        })
385    }
386}
387
388impl Log {
389    /// Convert an arrow RecordBatch into a vector of Log structs
390    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
391        let mut logs = Vec::new();
392        for log_reader in LogReader::iter(batch) {
393            logs.push(log_reader.try_into().context("convert log reader to log")?);
394        }
395        Ok(logs)
396    }
397}
398
399impl Trace {
400    /// Convert an arrow RecordBatch into a vector of Trace structs
401    pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result<Vec<Self>> {
402        let mut traces = Vec::new();
403        for trace_reader in TraceReader::iter(batch) {
404            traces.push(
405                trace_reader
406                    .try_into()
407                    .context("convert trace reader to trace")?,
408            );
409        }
410        Ok(traces)
411    }
412}