hypersync_client/
from_arrow.rs

1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3    BinaryArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4};
5
6use crate::{
7    simple_types::{Block, Log, Trace, Transaction},
8    ArrowBatch,
9};
10
11/// Used to do ArrowBatch-Native Rust type conversions while consuming the input value.
12pub trait FromArrow: Sized {
13    /// Converts to the Vector type from the ArrowBatch type.
14    fn from_arrow(batch: &ArrowBatch) -> Vec<Self>;
15}
16
17fn map_binary<'a, T>(i: usize, arr: Option<&'a BinaryArray<i32>>) -> Option<T>
18where
19    T: TryFrom<&'a [u8]>,
20    <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
21{
22    arr.and_then(|arr| arr.get(i).map(|v| v.try_into().unwrap()))
23}
24
25impl FromArrow for Block {
26    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
27        let number = batch.column::<UInt64Array>("number").ok();
28        let hash = batch.column::<BinaryArray<i32>>("hash").ok();
29        let parent_hash = batch.column::<BinaryArray<i32>>("parent_hash").ok();
30        let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
31        let sha3_uncles = batch.column::<BinaryArray<i32>>("sha3_uncles").ok();
32        let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
33        let transactions_root = batch.column::<BinaryArray<i32>>("transactions_root").ok();
34        let state_root = batch.column::<BinaryArray<i32>>("state_root").ok();
35        let receipts_root = batch.column::<BinaryArray<i32>>("receipts_root").ok();
36        let miner = batch.column::<BinaryArray<i32>>("miner").ok();
37        let difficulty = batch.column::<BinaryArray<i32>>("difficulty").ok();
38        let total_difficulty = batch.column::<BinaryArray<i32>>("total_difficulty").ok();
39        let extra_data = batch.column::<BinaryArray<i32>>("extra_data").ok();
40        let size = batch.column::<BinaryArray<i32>>("size").ok();
41        let gas_limit = batch.column::<BinaryArray<i32>>("gas_limit").ok();
42        let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
43        let timestamp = batch.column::<BinaryArray<i32>>("timestamp").ok();
44        let uncles = batch.column::<BinaryArray<i32>>("uncles").ok();
45        let base_fee_per_gas = batch.column::<BinaryArray<i32>>("base_fee_per_gas").ok();
46        let blob_gas_used = batch.column::<BinaryArray<i32>>("blob_gas_used").ok();
47        let excess_blob_gas = batch.column::<BinaryArray<i32>>("excess_blob_gas").ok();
48        let parent_beacon_block_root = batch
49            .column::<BinaryArray<i32>>("parent_beacon_block_root")
50            .ok();
51        let withdrawals_root = batch.column::<BinaryArray<i32>>("withdrawals_root").ok();
52        let withdrawals = batch.column::<BinaryArray<i32>>("withdrawals").ok();
53        let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
54        let send_count = batch.column::<BinaryArray<i32>>("send_count").ok();
55        let send_root = batch.column::<BinaryArray<i32>>("send_root").ok();
56        let mix_hash = batch.column::<BinaryArray<i32>>("mix_hash").ok();
57
58        (0..batch.chunk.len())
59            .map(|idx| Self {
60                number: number.and_then(|arr| arr.get(idx)),
61                hash: map_binary(idx, hash),
62                parent_hash: map_binary(idx, parent_hash),
63                nonce: map_binary(idx, nonce),
64                sha3_uncles: map_binary(idx, sha3_uncles),
65                logs_bloom: map_binary(idx, logs_bloom),
66                transactions_root: map_binary(idx, transactions_root),
67                state_root: map_binary(idx, state_root),
68                receipts_root: map_binary(idx, receipts_root),
69                miner: map_binary(idx, miner),
70                difficulty: map_binary(idx, difficulty),
71                total_difficulty: map_binary(idx, total_difficulty),
72                extra_data: map_binary(idx, extra_data),
73                size: map_binary(idx, size),
74                gas_limit: map_binary(idx, gas_limit),
75                gas_used: map_binary(idx, gas_used),
76                timestamp: map_binary(idx, timestamp),
77                uncles: uncles.and_then(|arr| {
78                    arr.get(idx).map(|v| {
79                        v.chunks(32)
80                            .map(|chunk| chunk.try_into().unwrap())
81                            .collect()
82                    })
83                }),
84                base_fee_per_gas: map_binary(idx, base_fee_per_gas),
85                blob_gas_used: map_binary(idx, blob_gas_used),
86                excess_blob_gas: map_binary(idx, excess_blob_gas),
87                parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
88                withdrawals_root: map_binary(idx, withdrawals_root),
89                withdrawals: withdrawals
90                    .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
91                l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
92                send_count: map_binary(idx, send_count),
93                send_root: map_binary(idx, send_root),
94                mix_hash: map_binary(idx, mix_hash),
95            })
96            .collect()
97    }
98}
99
100impl FromArrow for Transaction {
101    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
102        let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
103        let block_number = batch.column::<UInt64Array>("block_number").ok();
104        let from = batch.column::<BinaryArray<i32>>("from").ok();
105        let gas = batch.column::<BinaryArray<i32>>("gas").ok();
106        let gas_price = batch.column::<BinaryArray<i32>>("gas_price").ok();
107        let hash = batch.column::<BinaryArray<i32>>("hash").ok();
108        let input = batch.column::<BinaryArray<i32>>("input").ok();
109        let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
110        let to = batch.column::<BinaryArray<i32>>("to").ok();
111        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
112        let value = batch.column::<BinaryArray<i32>>("value").ok();
113        let v = batch.column::<BinaryArray<i32>>("v").ok();
114        let r = batch.column::<BinaryArray<i32>>("r").ok();
115        let s = batch.column::<BinaryArray<i32>>("s").ok();
116        let y_parity = batch.column::<BinaryArray<i32>>("y_parity").ok();
117        let max_priority_fee_per_gas = batch
118            .column::<BinaryArray<i32>>("max_priority_fee_per_gas")
119            .ok();
120        let max_fee_per_gas = batch.column::<BinaryArray<i32>>("max_fee_per_gas").ok();
121        let chain_id = batch.column::<BinaryArray<i32>>("chain_id").ok();
122        let access_list = batch.column::<BinaryArray<i32>>("access_list").ok();
123        let authorization_list = batch.column::<BinaryArray<i32>>("authorization_list").ok();
124        let max_fee_per_blob_gas = batch
125            .column::<BinaryArray<i32>>("max_fee_per_blob_gas")
126            .ok();
127        let blob_versioned_hashes = batch
128            .column::<BinaryArray<i32>>("blob_versioned_hashes")
129            .ok();
130        let cumulative_gas_used = batch.column::<BinaryArray<i32>>("cumulative_gas_used").ok();
131        let effective_gas_price = batch.column::<BinaryArray<i32>>("effective_gas_price").ok();
132        let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
133        let contract_address = batch.column::<BinaryArray<i32>>("contract_address").ok();
134        let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
135        let kind = batch.column::<UInt8Array>("type").ok();
136        let root = batch.column::<BinaryArray<i32>>("root").ok();
137        let status = batch.column::<UInt8Array>("status").ok();
138        let l1_fee = batch.column::<BinaryArray<i32>>("l1_fee").ok();
139        let l1_gas_price = batch.column::<BinaryArray<i32>>("l1_gas_price").ok();
140        let l1_gas_used = batch.column::<BinaryArray<i32>>("l1_gas_used").ok();
141        let l1_fee_scalar = batch.column::<BinaryArray<i32>>("l1_fee_scalar").ok();
142        let gas_used_for_l1 = batch.column::<BinaryArray<i32>>("gas_used_for_l1").ok();
143
144        (0..batch.chunk.len())
145            .map(|idx| Self {
146                block_hash: map_binary(idx, block_hash),
147                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
148                from: map_binary(idx, from),
149                gas: map_binary(idx, gas),
150                gas_price: map_binary(idx, gas_price),
151                hash: map_binary(idx, hash),
152                input: map_binary(idx, input),
153                nonce: map_binary(idx, nonce),
154                to: map_binary(idx, to),
155                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
156                value: map_binary(idx, value),
157                v: map_binary(idx, v),
158                r: map_binary(idx, r),
159                s: map_binary(idx, s),
160                y_parity: map_binary(idx, y_parity),
161                max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
162                max_fee_per_gas: map_binary(idx, max_fee_per_gas),
163                chain_id: map_binary(idx, chain_id),
164                access_list: access_list
165                    .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
166                authorization_list: authorization_list
167                    .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
168                max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
169                blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
170                    arr.get(idx).map(|v| {
171                        v.chunks(32)
172                            .map(|chunk| chunk.try_into().unwrap())
173                            .collect()
174                    })
175                }),
176                cumulative_gas_used: map_binary(idx, cumulative_gas_used),
177                effective_gas_price: map_binary(idx, effective_gas_price),
178                gas_used: map_binary(idx, gas_used),
179                contract_address: map_binary(idx, contract_address),
180                logs_bloom: map_binary(idx, logs_bloom),
181                kind: kind.and_then(|arr| arr.get(idx).map(|v| v.into())),
182                root: map_binary(idx, root),
183                status: status.and_then(|arr| {
184                    arr.get(idx)
185                        .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
186                }),
187                l1_fee: map_binary(idx, l1_fee),
188                l1_gas_price: map_binary(idx, l1_gas_price),
189                l1_gas_used: map_binary(idx, l1_gas_used),
190                l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
191                    arr.get(idx)
192                        .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
193                }),
194                gas_used_for_l1: map_binary(idx, gas_used_for_l1),
195            })
196            .collect()
197    }
198}
199
200impl FromArrow for Log {
201    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
202        let removed = batch.column::<BooleanArray>("removed").ok();
203        let log_index = batch.column::<UInt64Array>("log_index").ok();
204        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
205        let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
206        let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
207        let block_number = batch.column::<UInt64Array>("block_number").ok();
208        let address = batch.column::<BinaryArray<i32>>("address").ok();
209        let data = batch.column::<BinaryArray<i32>>("data").ok();
210        let topic0 = batch.column::<BinaryArray<i32>>("topic0").ok();
211        let topic1 = batch.column::<BinaryArray<i32>>("topic1").ok();
212        let topic2 = batch.column::<BinaryArray<i32>>("topic2").ok();
213        let topic3 = batch.column::<BinaryArray<i32>>("topic3").ok();
214
215        (0..batch.chunk.len())
216            .map(|idx| Self {
217                removed: removed.and_then(|arr| arr.get(idx)),
218                log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
219                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
220                transaction_hash: map_binary(idx, transaction_hash),
221                block_hash: map_binary(idx, block_hash),
222                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
223                address: map_binary(idx, address),
224                data: map_binary(idx, data),
225                topics: {
226                    let mut arr = ArrayVec::new();
227
228                    arr.push(map_binary(idx, topic0));
229                    arr.push(map_binary(idx, topic1));
230                    arr.push(map_binary(idx, topic2));
231                    arr.push(map_binary(idx, topic3));
232
233                    arr
234                },
235            })
236            .collect()
237    }
238}
239
240impl FromArrow for Trace {
241    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
242        let from = batch.column::<BinaryArray<i32>>("from").ok();
243        let to = batch.column::<BinaryArray<i32>>("to").ok();
244        let call_type = batch.column::<Utf8Array<i32>>("call_type").ok();
245        let gas = batch.column::<BinaryArray<i32>>("gas").ok();
246        let input = batch.column::<BinaryArray<i32>>("input").ok();
247        let init = batch.column::<BinaryArray<i32>>("init").ok();
248        let value = batch.column::<BinaryArray<i32>>("value").ok();
249        let author = batch.column::<BinaryArray<i32>>("author").ok();
250        let reward_type = batch.column::<Utf8Array<i32>>("reward_type").ok();
251        let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
252        let block_number = batch.column::<UInt64Array>("block_number").ok();
253        let address = batch.column::<BinaryArray<i32>>("address").ok();
254        let code = batch.column::<BinaryArray<i32>>("code").ok();
255        let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
256        let output = batch.column::<BinaryArray<i32>>("output").ok();
257        let subtraces = batch.column::<UInt64Array>("subtraces").ok();
258        let trace_address = batch.column::<BinaryArray<i32>>("trace_address").ok();
259        let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
260        let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
261        let kind = batch.column::<Utf8Array<i32>>("type").ok();
262        let error = batch.column::<Utf8Array<i32>>("error").ok();
263
264        (0..batch.chunk.len())
265            .map(|idx| Self {
266                from: map_binary(idx, from),
267                to: map_binary(idx, to),
268                call_type: call_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
269                gas: map_binary(idx, gas),
270                input: map_binary(idx, input),
271                init: map_binary(idx, init),
272                value: map_binary(idx, value),
273                author: map_binary(idx, author),
274                reward_type: reward_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
275                block_hash: map_binary(idx, block_hash),
276                block_number: block_number.and_then(|arr| arr.get(idx)),
277                address: map_binary(idx, address),
278                code: map_binary(idx, code),
279                gas_used: map_binary(idx, gas_used),
280                output: map_binary(idx, output),
281                subtraces: subtraces.and_then(|arr| arr.get(idx)),
282                trace_address: trace_address
283                    .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
284                transaction_hash: map_binary(idx, transaction_hash),
285                transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
286                kind: kind.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
287                error: error.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
288            })
289            .collect()
290    }
291}