hypersync_client/
from_arrow.rs

1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3    BinaryArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4};
5
6use crate::{
7    simple_types::{Block, Log, Trace, Transaction},
8    ArrowBatch,
9};
10
11/// Used to do ArrowBatch-Native Rust type conversions while consuming the input value.
12pub trait FromArrow: Sized {
13    /// Converts to the Vector type from the ArrowBatch type.
14    fn from_arrow(batch: &ArrowBatch) -> Vec<Self>;
15}
16
17fn map_binary<'a, T>(i: usize, arr: Option<&'a BinaryArray<i32>>) -> Option<T>
18where
19    T: TryFrom<&'a [u8]>,
20    <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
21{
22    arr.and_then(|arr| arr.get(i).map(|v| v.try_into().unwrap()))
23}
24
25impl FromArrow for Block {
26    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
27        let number = batch.column::<UInt64Array>("number").ok();
28        let hash = batch.column::<BinaryArray<i32>>("hash").ok();
29        let parent_hash = batch.column::<BinaryArray<i32>>("parent_hash").ok();
30        let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
31        let sha3_uncles = batch.column::<BinaryArray<i32>>("sha3_uncles").ok();
32        let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
33        let transactions_root = batch.column::<BinaryArray<i32>>("transactions_root").ok();
34        let state_root = batch.column::<BinaryArray<i32>>("state_root").ok();
35        let receipts_root = batch.column::<BinaryArray<i32>>("receipts_root").ok();
36        let miner = batch.column::<BinaryArray<i32>>("miner").ok();
37        let difficulty = batch.column::<BinaryArray<i32>>("difficulty").ok();
38        let total_difficulty = batch.column::<BinaryArray<i32>>("total_difficulty").ok();
39        let extra_data = batch.column::<BinaryArray<i32>>("extra_data").ok();
40        let size = batch.column::<BinaryArray<i32>>("size").ok();
41        let gas_limit = batch.column::<BinaryArray<i32>>("gas_limit").ok();
42        let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
43        let timestamp = batch.column::<BinaryArray<i32>>("timestamp").ok();
44        let uncles = batch.column::<BinaryArray<i32>>("uncles").ok();
45        let base_fee_per_gas = batch.column::<BinaryArray<i32>>("base_fee_per_gas").ok();
46        let blob_gas_used = batch.column::<BinaryArray<i32>>("blob_gas_used").ok();
47        let excess_blob_gas = batch.column::<BinaryArray<i32>>("excess_blob_gas").ok();
48        let parent_beacon_block_root = batch
49            .column::<BinaryArray<i32>>("parent_beacon_block_root")
50            .ok();
51        let withdrawals_root = batch.column::<BinaryArray<i32>>("withdrawals_root").ok();
52        let withdrawals = batch.column::<BinaryArray<i32>>("withdrawals").ok();
53        let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
54        let send_count = batch.column::<BinaryArray<i32>>("send_count").ok();
55        let send_root = batch.column::<BinaryArray<i32>>("send_root").ok();
56        let mix_hash = batch.column::<BinaryArray<i32>>("mix_hash").ok();
57
58        (0..batch.chunk.len())
59            .map(|idx| Self {
60                number: number.and_then(|arr| arr.get(idx)),
61                hash: map_binary(idx, hash),
62                parent_hash: map_binary(idx, parent_hash),
63                nonce: map_binary(idx, nonce),
64                sha3_uncles: map_binary(idx, sha3_uncles),
65                logs_bloom: map_binary(idx, logs_bloom),
66                transactions_root: map_binary(idx, transactions_root),
67                state_root: map_binary(idx, state_root),
68                receipts_root: map_binary(idx, receipts_root),
69                miner: map_binary(idx, miner),
70                difficulty: map_binary(idx, difficulty),
71                total_difficulty: map_binary(idx, total_difficulty),
72                extra_data: map_binary(idx, extra_data),
73                size: map_binary(idx, size),
74                gas_limit: map_binary(idx, gas_limit),
75                gas_used: map_binary(idx, gas_used),
76                timestamp: map_binary(idx, timestamp),
77                uncles: uncles.and_then(|arr| {
78                    arr.get(idx).map(|v| {
79                        v.chunks(32)
80                            .map(|chunk| chunk.try_into().unwrap())
81                            .collect()
82                    })
83                }),
84                base_fee_per_gas: map_binary(idx, base_fee_per_gas),
85                blob_gas_used: map_binary(idx, blob_gas_used),
86                excess_blob_gas: map_binary(idx, excess_blob_gas),
87                parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
88                withdrawals_root: map_binary(idx, withdrawals_root),
89                withdrawals: withdrawals
90                    .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
91                l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
92                send_count: map_binary(idx, send_count),
93                send_root: map_binary(idx, send_root),
94                mix_hash: map_binary(idx, mix_hash),
95            })
96            .collect()
97    }
98}
99
100impl FromArrow for Transaction {
101    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
102        let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
103        let block_number = batch.column::<UInt64Array>("block_number").ok();
104        let from = batch.column::<BinaryArray<i32>>("from").ok();
105        let gas = batch.column::<BinaryArray<i32>>("gas").ok();
106        let gas_price = batch.column::<BinaryArray<i32>>("gas_price").ok();
107        let hash = batch.column::<BinaryArray<i32>>("hash").ok();
108        let input = batch.column::<BinaryArray<i32>>("input").ok();
109        let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
110        let to = batch.column::<BinaryArray<i32>>("to").ok();
111        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
112        let value = batch.column::<BinaryArray<i32>>("value").ok();
113        let v = batch.column::<BinaryArray<i32>>("v").ok();
114        let r = batch.column::<BinaryArray<i32>>("r").ok();
115        let s = batch.column::<BinaryArray<i32>>("s").ok();
116        let y_parity = batch.column::<BinaryArray<i32>>("y_parity").ok();
117        let max_priority_fee_per_gas = batch
118            .column::<BinaryArray<i32>>("max_priority_fee_per_gas")
119            .ok();
120        let max_fee_per_gas = batch.column::<BinaryArray<i32>>("max_fee_per_gas").ok();
121        let chain_id = batch.column::<BinaryArray<i32>>("chain_id").ok();
122        let access_list = batch.column::<BinaryArray<i32>>("access_list").ok();
123        let max_fee_per_blob_gas = batch
124            .column::<BinaryArray<i32>>("max_fee_per_blob_gas")
125            .ok();
126        let blob_versioned_hashes = batch
127            .column::<BinaryArray<i32>>("blob_versioned_hashes")
128            .ok();
129        let cumulative_gas_used = batch.column::<BinaryArray<i32>>("cumulative_gas_used").ok();
130        let effective_gas_price = batch.column::<BinaryArray<i32>>("effective_gas_price").ok();
131        let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
132        let contract_address = batch.column::<BinaryArray<i32>>("contract_address").ok();
133        let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
134        let kind = batch.column::<UInt8Array>("type").ok();
135        let root = batch.column::<BinaryArray<i32>>("root").ok();
136        let status = batch.column::<UInt8Array>("status").ok();
137        let l1_fee = batch.column::<BinaryArray<i32>>("l1_fee").ok();
138        let l1_gas_price = batch.column::<BinaryArray<i32>>("l1_gas_price").ok();
139        let l1_gas_used = batch.column::<BinaryArray<i32>>("l1_gas_used").ok();
140        let l1_fee_scalar = batch.column::<BinaryArray<i32>>("l1_fee_scalar").ok();
141        let gas_used_for_l1 = batch.column::<BinaryArray<i32>>("gas_used_for_l1").ok();
142
143        (0..batch.chunk.len())
144            .map(|idx| Self {
145                block_hash: map_binary(idx, block_hash),
146                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
147                from: map_binary(idx, from),
148                gas: map_binary(idx, gas),
149                gas_price: map_binary(idx, gas_price),
150                hash: map_binary(idx, hash),
151                input: map_binary(idx, input),
152                nonce: map_binary(idx, nonce),
153                to: map_binary(idx, to),
154                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
155                value: map_binary(idx, value),
156                v: map_binary(idx, v),
157                r: map_binary(idx, r),
158                s: map_binary(idx, s),
159                y_parity: map_binary(idx, y_parity),
160                max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
161                max_fee_per_gas: map_binary(idx, max_fee_per_gas),
162                chain_id: map_binary(idx, chain_id),
163                access_list: access_list
164                    .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
165                max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
166                blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
167                    arr.get(idx).map(|v| {
168                        v.chunks(32)
169                            .map(|chunk| chunk.try_into().unwrap())
170                            .collect()
171                    })
172                }),
173                cumulative_gas_used: map_binary(idx, cumulative_gas_used),
174                effective_gas_price: map_binary(idx, effective_gas_price),
175                gas_used: map_binary(idx, gas_used),
176                contract_address: map_binary(idx, contract_address),
177                logs_bloom: map_binary(idx, logs_bloom),
178                kind: kind.and_then(|arr| arr.get(idx).map(|v| v.into())),
179                root: map_binary(idx, root),
180                status: status.and_then(|arr| {
181                    arr.get(idx)
182                        .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
183                }),
184                l1_fee: map_binary(idx, l1_fee),
185                l1_gas_price: map_binary(idx, l1_gas_price),
186                l1_gas_used: map_binary(idx, l1_gas_used),
187                l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
188                    arr.get(idx)
189                        .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
190                }),
191                gas_used_for_l1: map_binary(idx, gas_used_for_l1),
192            })
193            .collect()
194    }
195}
196
197impl FromArrow for Log {
198    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
199        let removed = batch.column::<BooleanArray>("removed").ok();
200        let log_index = batch.column::<UInt64Array>("log_index").ok();
201        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
202        let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
203        let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
204        let block_number = batch.column::<UInt64Array>("block_number").ok();
205        let address = batch.column::<BinaryArray<i32>>("address").ok();
206        let data = batch.column::<BinaryArray<i32>>("data").ok();
207        let topic0 = batch.column::<BinaryArray<i32>>("topic0").ok();
208        let topic1 = batch.column::<BinaryArray<i32>>("topic1").ok();
209        let topic2 = batch.column::<BinaryArray<i32>>("topic2").ok();
210        let topic3 = batch.column::<BinaryArray<i32>>("topic3").ok();
211
212        (0..batch.chunk.len())
213            .map(|idx| Self {
214                removed: removed.and_then(|arr| arr.get(idx)),
215                log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
216                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
217                transaction_hash: map_binary(idx, transaction_hash),
218                block_hash: map_binary(idx, block_hash),
219                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
220                address: map_binary(idx, address),
221                data: map_binary(idx, data),
222                topics: {
223                    let mut arr = ArrayVec::new();
224
225                    arr.push(map_binary(idx, topic0));
226                    arr.push(map_binary(idx, topic1));
227                    arr.push(map_binary(idx, topic2));
228                    arr.push(map_binary(idx, topic3));
229
230                    arr
231                },
232            })
233            .collect()
234    }
235}
236
237impl FromArrow for Trace {
238    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
239        let from = batch.column::<BinaryArray<i32>>("from").ok();
240        let to = batch.column::<BinaryArray<i32>>("to").ok();
241        let call_type = batch.column::<Utf8Array<i32>>("call_type").ok();
242        let gas = batch.column::<BinaryArray<i32>>("gas").ok();
243        let input = batch.column::<BinaryArray<i32>>("input").ok();
244        let init = batch.column::<BinaryArray<i32>>("init").ok();
245        let value = batch.column::<BinaryArray<i32>>("value").ok();
246        let author = batch.column::<BinaryArray<i32>>("author").ok();
247        let reward_type = batch.column::<Utf8Array<i32>>("reward_type").ok();
248        let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
249        let block_number = batch.column::<UInt64Array>("block_number").ok();
250        let address = batch.column::<BinaryArray<i32>>("address").ok();
251        let code = batch.column::<BinaryArray<i32>>("code").ok();
252        let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
253        let output = batch.column::<BinaryArray<i32>>("output").ok();
254        let subtraces = batch.column::<UInt64Array>("subtraces").ok();
255        let trace_address = batch.column::<BinaryArray<i32>>("trace_address").ok();
256        let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
257        let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
258        let kind = batch.column::<Utf8Array<i32>>("type").ok();
259        let error = batch.column::<Utf8Array<i32>>("error").ok();
260
261        (0..batch.chunk.len())
262            .map(|idx| Self {
263                from: map_binary(idx, from),
264                to: map_binary(idx, to),
265                call_type: call_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
266                gas: map_binary(idx, gas),
267                input: map_binary(idx, input),
268                init: map_binary(idx, init),
269                value: map_binary(idx, value),
270                author: map_binary(idx, author),
271                reward_type: reward_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
272                block_hash: map_binary(idx, block_hash),
273                block_number: block_number.and_then(|arr| arr.get(idx)),
274                address: map_binary(idx, address),
275                code: map_binary(idx, code),
276                gas_used: map_binary(idx, gas_used),
277                output: map_binary(idx, output),
278                subtraces: subtraces.and_then(|arr| arr.get(idx)),
279                trace_address: trace_address
280                    .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
281                transaction_hash: map_binary(idx, transaction_hash),
282                transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
283                kind: kind.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
284                error: error.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
285            })
286            .collect()
287    }
288}