hypersync_client/
from_arrow.rs

1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3    BinaryArray, BinaryViewArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4    Utf8ViewArray,
5};
6
7use crate::{
8    simple_types::{Block, Log, Trace, Transaction},
9    ArrowBatch,
10};
11
12/// Used to do ArrowBatch-Native Rust type conversions while consuming the input value.
13pub trait FromArrow: Sized {
14    /// Converts to the Vector type from the ArrowBatch type.
15    /// B is the type of binary array used to downcast since we use both binary views and
16    /// binary arrays
17    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
18        batch: &ArrowBatch,
19    ) -> Vec<Self>;
20
21    /// Default implementation that uses the binary array type.
22    /// IPC is written with regular binary arrays.
23    /// Named 'from_arrow' for backwards compatibility.
24    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
25        Self::from_arrow_bin_array::<BinaryArray<i32>, Utf8Array<i32>>(batch)
26    }
27
28    /// An additional method that uses the binary view array type.
29    /// This is to be able to reuse the trait server side where Binary Views are
30    /// used instead.
31    fn from_arrow_bin_view_array(batch: &ArrowBatch) -> Vec<Self> {
32        Self::from_arrow_bin_array::<BinaryViewArray, Utf8ViewArray>(batch)
33    }
34}
35
36/// A simple trait to compose binary array types that need to be
37/// accessed by index
38pub trait List {
39    type Item: ?Sized;
40
41    fn get_idx(&self, i: usize) -> Option<&Self::Item>;
42}
43
44impl List for BinaryArray<i32> {
45    type Item = [u8];
46
47    fn get_idx(&self, i: usize) -> Option<&[u8]> {
48        self.get(i)
49    }
50}
51impl List for BinaryViewArray {
52    type Item = [u8];
53
54    fn get_idx(&self, i: usize) -> Option<&[u8]> {
55        self.get(i)
56    }
57}
58
59impl List for Utf8Array<i32> {
60    type Item = str;
61
62    fn get_idx(&self, i: usize) -> Option<&str> {
63        self.get(i)
64    }
65}
66
67impl List for Utf8ViewArray {
68    type Item = str;
69
70    fn get_idx(&self, i: usize) -> Option<&str> {
71        self.get(i)
72    }
73}
74
75fn map_binary<'a, T, B>(i: usize, arr: Option<&'a B>) -> Option<T>
76where
77    T: TryFrom<&'a [u8]>,
78    <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
79    B: List<Item = [u8]>,
80{
81    arr.and_then(|arr| arr.get_idx(i).map(|v| T::try_from(v).unwrap()))
82}
83
84impl FromArrow for Block {
85    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
86        let number = batch.column::<UInt64Array>("number").ok();
87        let hash = batch.column::<B>("hash").ok();
88        let parent_hash = batch.column::<B>("parent_hash").ok();
89        let nonce = batch.column::<B>("nonce").ok();
90        let sha3_uncles = batch.column::<B>("sha3_uncles").ok();
91        let logs_bloom = batch.column::<B>("logs_bloom").ok();
92        let transactions_root = batch.column::<B>("transactions_root").ok();
93        let state_root = batch.column::<B>("state_root").ok();
94        let receipts_root = batch.column::<B>("receipts_root").ok();
95        let miner = batch.column::<B>("miner").ok();
96        let difficulty = batch.column::<B>("difficulty").ok();
97        let total_difficulty = batch.column::<B>("total_difficulty").ok();
98        let extra_data = batch.column::<B>("extra_data").ok();
99        let size = batch.column::<B>("size").ok();
100        let gas_limit = batch.column::<B>("gas_limit").ok();
101        let gas_used = batch.column::<B>("gas_used").ok();
102        let timestamp = batch.column::<B>("timestamp").ok();
103        let uncles = batch.column::<B>("uncles").ok();
104        let base_fee_per_gas = batch.column::<B>("base_fee_per_gas").ok();
105        let blob_gas_used = batch.column::<B>("blob_gas_used").ok();
106        let excess_blob_gas = batch.column::<B>("excess_blob_gas").ok();
107        let parent_beacon_block_root = batch.column::<B>("parent_beacon_block_root").ok();
108        let withdrawals_root = batch.column::<B>("withdrawals_root").ok();
109        let withdrawals = batch.column::<B>("withdrawals").ok();
110        let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
111        let send_count = batch.column::<B>("send_count").ok();
112        let send_root = batch.column::<B>("send_root").ok();
113        let mix_hash = batch.column::<B>("mix_hash").ok();
114
115        (0..batch.chunk.len())
116            .map(|idx| Self {
117                number: number.and_then(|arr| arr.get(idx)),
118                hash: map_binary(idx, hash),
119                parent_hash: map_binary(idx, parent_hash),
120                nonce: map_binary(idx, nonce),
121                sha3_uncles: map_binary(idx, sha3_uncles),
122                logs_bloom: map_binary(idx, logs_bloom),
123                transactions_root: map_binary(idx, transactions_root),
124                state_root: map_binary(idx, state_root),
125                receipts_root: map_binary(idx, receipts_root),
126                miner: map_binary(idx, miner),
127                difficulty: map_binary(idx, difficulty),
128                total_difficulty: map_binary(idx, total_difficulty),
129                extra_data: map_binary(idx, extra_data),
130                size: map_binary(idx, size),
131                gas_limit: map_binary(idx, gas_limit),
132                gas_used: map_binary(idx, gas_used),
133                timestamp: map_binary(idx, timestamp),
134                uncles: uncles.and_then(|arr| {
135                    arr.get_idx(idx).map(|v| {
136                        v.chunks(32)
137                            .map(|chunk| chunk.try_into().unwrap())
138                            .collect()
139                    })
140                }),
141                base_fee_per_gas: map_binary(idx, base_fee_per_gas),
142                blob_gas_used: map_binary(idx, blob_gas_used),
143                excess_blob_gas: map_binary(idx, excess_blob_gas),
144                parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
145                withdrawals_root: map_binary(idx, withdrawals_root),
146                withdrawals: withdrawals
147                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
148                l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
149                send_count: map_binary(idx, send_count),
150                send_root: map_binary(idx, send_root),
151                mix_hash: map_binary(idx, mix_hash),
152            })
153            .collect()
154    }
155}
156
157impl FromArrow for Transaction {
158    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
159        let block_hash = batch.column::<B>("block_hash").ok();
160        let block_number = batch.column::<UInt64Array>("block_number").ok();
161        let from = batch.column::<B>("from").ok();
162        let gas = batch.column::<B>("gas").ok();
163        let gas_price = batch.column::<B>("gas_price").ok();
164        let hash = batch.column::<B>("hash").ok();
165        let input = batch.column::<B>("input").ok();
166        let nonce = batch.column::<B>("nonce").ok();
167        let to = batch.column::<B>("to").ok();
168        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
169        let value = batch.column::<B>("value").ok();
170        let v = batch.column::<B>("v").ok();
171        let r = batch.column::<B>("r").ok();
172        let s = batch.column::<B>("s").ok();
173        let y_parity = batch.column::<B>("y_parity").ok();
174        let max_priority_fee_per_gas = batch.column::<B>("max_priority_fee_per_gas").ok();
175        let max_fee_per_gas = batch.column::<B>("max_fee_per_gas").ok();
176        let chain_id = batch.column::<B>("chain_id").ok();
177        let access_list = batch.column::<B>("access_list").ok();
178        let authorization_list = batch.column::<B>("authorization_list").ok();
179        let max_fee_per_blob_gas = batch.column::<B>("max_fee_per_blob_gas").ok();
180        let blob_versioned_hashes = batch.column::<B>("blob_versioned_hashes").ok();
181        let cumulative_gas_used = batch.column::<B>("cumulative_gas_used").ok();
182        let effective_gas_price = batch.column::<B>("effective_gas_price").ok();
183        let gas_used = batch.column::<B>("gas_used").ok();
184        let contract_address = batch.column::<B>("contract_address").ok();
185        let logs_bloom = batch.column::<B>("logs_bloom").ok();
186        let type_ = batch.column::<UInt8Array>("type").ok();
187        let root = batch.column::<B>("root").ok();
188        let status = batch.column::<UInt8Array>("status").ok();
189        let l1_fee = batch.column::<B>("l1_fee").ok();
190        let l1_gas_price = batch.column::<B>("l1_gas_price").ok();
191        let l1_gas_used = batch.column::<B>("l1_gas_used").ok();
192        let l1_fee_scalar = batch.column::<B>("l1_fee_scalar").ok();
193        let gas_used_for_l1 = batch.column::<B>("gas_used_for_l1").ok();
194        let blob_gas_price = batch.column::<B>("blob_gas_price").ok();
195        let blob_gas_used = batch.column::<B>("blob_gas_used").ok();
196        let deposit_nonce = batch.column::<B>("deposit_nonce").ok();
197        let deposit_receipt_version = batch.column::<B>("deposit_receipt_version").ok();
198        let l1_base_fee_scalar = batch.column::<B>("l1_base_fee_scalar").ok();
199        let l1_blob_base_fee = batch.column::<B>("l1_blob_base_fee").ok();
200        let l1_blob_base_fee_scalar = batch.column::<B>("l1_blob_base_fee_scalar").ok();
201        let l1_block_number = batch.column::<B>("l1_block_number").ok();
202        let mint = batch.column::<B>("mint").ok();
203        let sighash = batch.column::<B>("sighash").ok();
204        let source_hash = batch.column::<B>("source_hash").ok();
205
206        (0..batch.chunk.len())
207            .map(|idx| Self {
208                block_hash: map_binary(idx, block_hash),
209                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
210                from: map_binary(idx, from),
211                gas: map_binary(idx, gas),
212                gas_price: map_binary(idx, gas_price),
213                hash: map_binary(idx, hash),
214                input: map_binary(idx, input),
215                nonce: map_binary(idx, nonce),
216                to: map_binary(idx, to),
217                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
218                value: map_binary(idx, value),
219                v: map_binary(idx, v),
220                r: map_binary(idx, r),
221                s: map_binary(idx, s),
222                y_parity: map_binary(idx, y_parity),
223                max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
224                max_fee_per_gas: map_binary(idx, max_fee_per_gas),
225                chain_id: map_binary(idx, chain_id),
226                access_list: access_list
227                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
228                authorization_list: authorization_list
229                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
230                max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
231                blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
232                    arr.get_idx(idx).map(|v| {
233                        v.chunks(32)
234                            .map(|chunk| chunk.try_into().unwrap())
235                            .collect()
236                    })
237                }),
238                cumulative_gas_used: map_binary(idx, cumulative_gas_used),
239                effective_gas_price: map_binary(idx, effective_gas_price),
240                gas_used: map_binary(idx, gas_used),
241                contract_address: map_binary(idx, contract_address),
242                logs_bloom: map_binary(idx, logs_bloom),
243                type_: type_.and_then(|arr| arr.get(idx).map(|v| v.into())),
244                root: map_binary(idx, root),
245                status: status.and_then(|arr| {
246                    arr.get(idx)
247                        .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
248                }),
249                l1_fee: map_binary(idx, l1_fee),
250                l1_gas_price: map_binary(idx, l1_gas_price),
251                l1_gas_used: map_binary(idx, l1_gas_used),
252                l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
253                    arr.get_idx(idx)
254                        .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
255                }),
256                gas_used_for_l1: map_binary(idx, gas_used_for_l1),
257                blob_gas_price: map_binary(idx, blob_gas_price),
258                blob_gas_used: map_binary(idx, blob_gas_used),
259                deposit_nonce: map_binary(idx, deposit_nonce),
260                deposit_receipt_version: map_binary(idx, deposit_receipt_version),
261                l1_base_fee_scalar: map_binary(idx, l1_base_fee_scalar),
262                l1_blob_base_fee: map_binary(idx, l1_blob_base_fee),
263                l1_blob_base_fee_scalar: map_binary(idx, l1_blob_base_fee_scalar),
264                l1_block_number: map_binary(idx, l1_block_number),
265                mint: map_binary(idx, mint),
266                sighash: map_binary(idx, sighash),
267                source_hash: map_binary(idx, source_hash),
268            })
269            .collect()
270    }
271}
272
273impl FromArrow for Log {
274    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
275        let removed = batch.column::<BooleanArray>("removed").ok();
276        let log_index = batch.column::<UInt64Array>("log_index").ok();
277        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
278        let transaction_hash = batch.column::<B>("transaction_hash").ok();
279        let block_hash = batch.column::<B>("block_hash").ok();
280        let block_number = batch.column::<UInt64Array>("block_number").ok();
281        let address = batch.column::<B>("address").ok();
282        let data = batch.column::<B>("data").ok();
283        let topic0 = batch.column::<B>("topic0").ok();
284        let topic1 = batch.column::<B>("topic1").ok();
285        let topic2 = batch.column::<B>("topic2").ok();
286        let topic3 = batch.column::<B>("topic3").ok();
287
288        (0..batch.chunk.len())
289            .map(|idx| Self {
290                removed: removed.and_then(|arr| arr.get(idx)),
291                log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
292                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
293                transaction_hash: map_binary(idx, transaction_hash),
294                block_hash: map_binary(idx, block_hash),
295                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
296                address: map_binary(idx, address),
297                data: map_binary(idx, data),
298                topics: {
299                    let mut arr = ArrayVec::new();
300
301                    arr.push(map_binary(idx, topic0));
302                    arr.push(map_binary(idx, topic1));
303                    arr.push(map_binary(idx, topic2));
304                    arr.push(map_binary(idx, topic3));
305
306                    arr
307                },
308            })
309            .collect()
310    }
311}
312
313impl FromArrow for Trace {
314    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
315        batch: &ArrowBatch,
316    ) -> Vec<Self> {
317        let from = batch.column::<B>("from").ok();
318        let to = batch.column::<B>("to").ok();
319        let call_type = batch.column::<U>("call_type").ok();
320        let gas = batch.column::<B>("gas").ok();
321        let input = batch.column::<B>("input").ok();
322        let init = batch.column::<B>("init").ok();
323        let value = batch.column::<B>("value").ok();
324        let author = batch.column::<B>("author").ok();
325        let reward_type = batch.column::<U>("reward_type").ok();
326        let block_hash = batch.column::<B>("block_hash").ok();
327        let block_number = batch.column::<UInt64Array>("block_number").ok();
328        let address = batch.column::<B>("address").ok();
329        let code = batch.column::<B>("code").ok();
330        let gas_used = batch.column::<B>("gas_used").ok();
331        let output = batch.column::<B>("output").ok();
332        let subtraces = batch.column::<UInt64Array>("subtraces").ok();
333        let trace_address = batch.column::<B>("trace_address").ok();
334        let transaction_hash = batch.column::<B>("transaction_hash").ok();
335        let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
336        let type_ = batch.column::<U>("type").ok();
337        let error = batch.column::<U>("error").ok();
338        let sighash = batch.column::<B>("sighash").ok();
339        let action_address = batch.column::<B>("action_address").ok();
340        let balance = batch.column::<B>("balance").ok();
341        let refund_address = batch.column::<B>("refund_address").ok();
342
343        (0..batch.chunk.len())
344            .map(|idx| Self {
345                from: map_binary(idx, from),
346                to: map_binary(idx, to),
347                call_type: call_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
348                gas: map_binary(idx, gas),
349                input: map_binary(idx, input),
350                init: map_binary(idx, init),
351                value: map_binary(idx, value),
352                author: map_binary(idx, author),
353                reward_type: reward_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
354                block_hash: map_binary(idx, block_hash),
355                block_number: block_number.and_then(|arr| arr.get(idx)),
356                address: map_binary(idx, address),
357                code: map_binary(idx, code),
358                gas_used: map_binary(idx, gas_used),
359                output: map_binary(idx, output),
360                subtraces: subtraces.and_then(|arr| arr.get(idx)),
361                trace_address: trace_address
362                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
363                transaction_hash: map_binary(idx, transaction_hash),
364                transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
365                type_: type_.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
366                error: error.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
367                sighash: map_binary(idx, sighash),
368                action_address: map_binary(idx, action_address),
369                balance: map_binary(idx, balance),
370                refund_address: map_binary(idx, refund_address),
371            })
372            .collect()
373    }
374}