hypersync_client/
from_arrow.rs

1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3    BinaryArray, BinaryViewArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4    Utf8ViewArray,
5};
6
7use crate::{
8    simple_types::{Block, Log, Trace, Transaction},
9    ArrowBatch,
10};
11
12/// Used to do ArrowBatch-Native Rust type conversions while consuming the input value.
13pub trait FromArrow: Sized {
14    /// Converts to the Vector type from the ArrowBatch type.
15    /// B is the type of binary array used to downcast since we use both binary views and
16    /// binary arrays
17    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
18        batch: &ArrowBatch,
19    ) -> Vec<Self>;
20
21    /// Default implementation that uses the binary array type.
22    /// IPC is written with regular binary arrays.
23    /// Named 'from_arrow' for backwards compatibility.
24    fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
25        Self::from_arrow_bin_array::<BinaryArray<i32>, Utf8Array<i32>>(batch)
26    }
27
28    /// An additional method that uses the binary view array type.
29    /// This is to be able to reuse the trait server side where Binary Views are
30    /// used instead.
31    fn from_arrow_bin_view_array(batch: &ArrowBatch) -> Vec<Self> {
32        Self::from_arrow_bin_array::<BinaryViewArray, Utf8ViewArray>(batch)
33    }
34}
35
36/// A simple trait to compose binary array types that need to be
37/// accessed by index
38pub trait List {
39    type Item: ?Sized;
40
41    fn get_idx(&self, i: usize) -> Option<&Self::Item>;
42}
43
44impl List for BinaryArray<i32> {
45    type Item = [u8];
46
47    fn get_idx(&self, i: usize) -> Option<&[u8]> {
48        self.get(i)
49    }
50}
51impl List for BinaryViewArray {
52    type Item = [u8];
53
54    fn get_idx(&self, i: usize) -> Option<&[u8]> {
55        self.get(i)
56    }
57}
58
59impl List for Utf8Array<i32> {
60    type Item = str;
61
62    fn get_idx(&self, i: usize) -> Option<&str> {
63        self.get(i)
64    }
65}
66
67impl List for Utf8ViewArray {
68    type Item = str;
69
70    fn get_idx(&self, i: usize) -> Option<&str> {
71        self.get(i)
72    }
73}
74
75fn map_binary<'a, T, B>(i: usize, arr: Option<&'a B>) -> Option<T>
76where
77    T: TryFrom<&'a [u8]>,
78    <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
79    B: List<Item = [u8]>,
80{
81    arr.and_then(|arr| arr.get_idx(i).map(|v| T::try_from(v).unwrap()))
82}
83
84impl FromArrow for Block {
85    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
86        let number = batch.column::<UInt64Array>("number").ok();
87        let hash = batch.column::<B>("hash").ok();
88        let parent_hash = batch.column::<B>("parent_hash").ok();
89        let nonce = batch.column::<B>("nonce").ok();
90        let sha3_uncles = batch.column::<B>("sha3_uncles").ok();
91        let logs_bloom = batch.column::<B>("logs_bloom").ok();
92        let transactions_root = batch.column::<B>("transactions_root").ok();
93        let state_root = batch.column::<B>("state_root").ok();
94        let receipts_root = batch.column::<B>("receipts_root").ok();
95        let miner = batch.column::<B>("miner").ok();
96        let difficulty = batch.column::<B>("difficulty").ok();
97        let total_difficulty = batch.column::<B>("total_difficulty").ok();
98        let extra_data = batch.column::<B>("extra_data").ok();
99        let size = batch.column::<B>("size").ok();
100        let gas_limit = batch.column::<B>("gas_limit").ok();
101        let gas_used = batch.column::<B>("gas_used").ok();
102        let timestamp = batch.column::<B>("timestamp").ok();
103        let uncles = batch.column::<B>("uncles").ok();
104        let base_fee_per_gas = batch.column::<B>("base_fee_per_gas").ok();
105        let blob_gas_used = batch.column::<B>("blob_gas_used").ok();
106        let excess_blob_gas = batch.column::<B>("excess_blob_gas").ok();
107        let parent_beacon_block_root = batch.column::<B>("parent_beacon_block_root").ok();
108        let withdrawals_root = batch.column::<B>("withdrawals_root").ok();
109        let withdrawals = batch.column::<B>("withdrawals").ok();
110        let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
111        let send_count = batch.column::<B>("send_count").ok();
112        let send_root = batch.column::<B>("send_root").ok();
113        let mix_hash = batch.column::<B>("mix_hash").ok();
114
115        (0..batch.chunk.len())
116            .map(|idx| Self {
117                number: number.and_then(|arr| arr.get(idx)),
118                hash: map_binary(idx, hash),
119                parent_hash: map_binary(idx, parent_hash),
120                nonce: map_binary(idx, nonce),
121                sha3_uncles: map_binary(idx, sha3_uncles),
122                logs_bloom: map_binary(idx, logs_bloom),
123                transactions_root: map_binary(idx, transactions_root),
124                state_root: map_binary(idx, state_root),
125                receipts_root: map_binary(idx, receipts_root),
126                miner: map_binary(idx, miner),
127                difficulty: map_binary(idx, difficulty),
128                total_difficulty: map_binary(idx, total_difficulty),
129                extra_data: map_binary(idx, extra_data),
130                size: map_binary(idx, size),
131                gas_limit: map_binary(idx, gas_limit),
132                gas_used: map_binary(idx, gas_used),
133                timestamp: map_binary(idx, timestamp),
134                uncles: uncles.and_then(|arr| {
135                    arr.get_idx(idx).map(|v| {
136                        v.chunks(32)
137                            .map(|chunk| chunk.try_into().unwrap())
138                            .collect()
139                    })
140                }),
141                base_fee_per_gas: map_binary(idx, base_fee_per_gas),
142                blob_gas_used: map_binary(idx, blob_gas_used),
143                excess_blob_gas: map_binary(idx, excess_blob_gas),
144                parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
145                withdrawals_root: map_binary(idx, withdrawals_root),
146                withdrawals: withdrawals
147                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
148                l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
149                send_count: map_binary(idx, send_count),
150                send_root: map_binary(idx, send_root),
151                mix_hash: map_binary(idx, mix_hash),
152            })
153            .collect()
154    }
155}
156
157impl FromArrow for Transaction {
158    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
159        let block_hash = batch.column::<B>("block_hash").ok();
160        let block_number = batch.column::<UInt64Array>("block_number").ok();
161        let from = batch.column::<B>("from").ok();
162        let gas = batch.column::<B>("gas").ok();
163        let gas_price = batch.column::<B>("gas_price").ok();
164        let hash = batch.column::<B>("hash").ok();
165        let input = batch.column::<B>("input").ok();
166        let nonce = batch.column::<B>("nonce").ok();
167        let to = batch.column::<B>("to").ok();
168        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
169        let value = batch.column::<B>("value").ok();
170        let v = batch.column::<B>("v").ok();
171        let r = batch.column::<B>("r").ok();
172        let s = batch.column::<B>("s").ok();
173        let y_parity = batch.column::<B>("y_parity").ok();
174        let max_priority_fee_per_gas = batch.column::<B>("max_priority_fee_per_gas").ok();
175        let max_fee_per_gas = batch.column::<B>("max_fee_per_gas").ok();
176        let chain_id = batch.column::<B>("chain_id").ok();
177        let access_list = batch.column::<B>("access_list").ok();
178        let authorization_list = batch.column::<B>("authorization_list").ok();
179        let max_fee_per_blob_gas = batch.column::<B>("max_fee_per_blob_gas").ok();
180        let blob_versioned_hashes = batch.column::<B>("blob_versioned_hashes").ok();
181        let cumulative_gas_used = batch.column::<B>("cumulative_gas_used").ok();
182        let effective_gas_price = batch.column::<B>("effective_gas_price").ok();
183        let gas_used = batch.column::<B>("gas_used").ok();
184        let contract_address = batch.column::<B>("contract_address").ok();
185        let logs_bloom = batch.column::<B>("logs_bloom").ok();
186        let kind = batch.column::<UInt8Array>("type").ok();
187        let root = batch.column::<B>("root").ok();
188        let status = batch.column::<UInt8Array>("status").ok();
189        let l1_fee = batch.column::<B>("l1_fee").ok();
190        let l1_gas_price = batch.column::<B>("l1_gas_price").ok();
191        let l1_gas_used = batch.column::<B>("l1_gas_used").ok();
192        let l1_fee_scalar = batch.column::<B>("l1_fee_scalar").ok();
193        let gas_used_for_l1 = batch.column::<B>("gas_used_for_l1").ok();
194
195        (0..batch.chunk.len())
196            .map(|idx| Self {
197                block_hash: map_binary(idx, block_hash),
198                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
199                from: map_binary(idx, from),
200                gas: map_binary(idx, gas),
201                gas_price: map_binary(idx, gas_price),
202                hash: map_binary(idx, hash),
203                input: map_binary(idx, input),
204                nonce: map_binary(idx, nonce),
205                to: map_binary(idx, to),
206                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
207                value: map_binary(idx, value),
208                v: map_binary(idx, v),
209                r: map_binary(idx, r),
210                s: map_binary(idx, s),
211                y_parity: map_binary(idx, y_parity),
212                max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
213                max_fee_per_gas: map_binary(idx, max_fee_per_gas),
214                chain_id: map_binary(idx, chain_id),
215                access_list: access_list
216                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
217                authorization_list: authorization_list
218                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
219                max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
220                blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
221                    arr.get_idx(idx).map(|v| {
222                        v.chunks(32)
223                            .map(|chunk| chunk.try_into().unwrap())
224                            .collect()
225                    })
226                }),
227                cumulative_gas_used: map_binary(idx, cumulative_gas_used),
228                effective_gas_price: map_binary(idx, effective_gas_price),
229                gas_used: map_binary(idx, gas_used),
230                contract_address: map_binary(idx, contract_address),
231                logs_bloom: map_binary(idx, logs_bloom),
232                kind: kind.and_then(|arr| arr.get(idx).map(|v| v.into())),
233                root: map_binary(idx, root),
234                status: status.and_then(|arr| {
235                    arr.get(idx)
236                        .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
237                }),
238                l1_fee: map_binary(idx, l1_fee),
239                l1_gas_price: map_binary(idx, l1_gas_price),
240                l1_gas_used: map_binary(idx, l1_gas_used),
241                l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
242                    arr.get_idx(idx)
243                        .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
244                }),
245                gas_used_for_l1: map_binary(idx, gas_used_for_l1),
246            })
247            .collect()
248    }
249}
250
251impl FromArrow for Log {
252    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
253        let removed = batch.column::<BooleanArray>("removed").ok();
254        let log_index = batch.column::<UInt64Array>("log_index").ok();
255        let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
256        let transaction_hash = batch.column::<B>("transaction_hash").ok();
257        let block_hash = batch.column::<B>("block_hash").ok();
258        let block_number = batch.column::<UInt64Array>("block_number").ok();
259        let address = batch.column::<B>("address").ok();
260        let data = batch.column::<B>("data").ok();
261        let topic0 = batch.column::<B>("topic0").ok();
262        let topic1 = batch.column::<B>("topic1").ok();
263        let topic2 = batch.column::<B>("topic2").ok();
264        let topic3 = batch.column::<B>("topic3").ok();
265
266        (0..batch.chunk.len())
267            .map(|idx| Self {
268                removed: removed.and_then(|arr| arr.get(idx)),
269                log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
270                transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
271                transaction_hash: map_binary(idx, transaction_hash),
272                block_hash: map_binary(idx, block_hash),
273                block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
274                address: map_binary(idx, address),
275                data: map_binary(idx, data),
276                topics: {
277                    let mut arr = ArrayVec::new();
278
279                    arr.push(map_binary(idx, topic0));
280                    arr.push(map_binary(idx, topic1));
281                    arr.push(map_binary(idx, topic2));
282                    arr.push(map_binary(idx, topic3));
283
284                    arr
285                },
286            })
287            .collect()
288    }
289}
290
291impl FromArrow for Trace {
292    fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
293        batch: &ArrowBatch,
294    ) -> Vec<Self> {
295        let from = batch.column::<B>("from").ok();
296        let to = batch.column::<B>("to").ok();
297        let call_type = batch.column::<U>("call_type").ok();
298        let gas = batch.column::<B>("gas").ok();
299        let input = batch.column::<B>("input").ok();
300        let init = batch.column::<B>("init").ok();
301        let value = batch.column::<B>("value").ok();
302        let author = batch.column::<B>("author").ok();
303        let reward_type = batch.column::<U>("reward_type").ok();
304        let block_hash = batch.column::<B>("block_hash").ok();
305        let block_number = batch.column::<UInt64Array>("block_number").ok();
306        let address = batch.column::<B>("address").ok();
307        let code = batch.column::<B>("code").ok();
308        let gas_used = batch.column::<B>("gas_used").ok();
309        let output = batch.column::<B>("output").ok();
310        let subtraces = batch.column::<UInt64Array>("subtraces").ok();
311        let trace_address = batch.column::<B>("trace_address").ok();
312        let transaction_hash = batch.column::<B>("transaction_hash").ok();
313        let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
314        let kind = batch.column::<U>("type").ok();
315        let error = batch.column::<U>("error").ok();
316
317        (0..batch.chunk.len())
318            .map(|idx| Self {
319                from: map_binary(idx, from),
320                to: map_binary(idx, to),
321                call_type: call_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
322                gas: map_binary(idx, gas),
323                input: map_binary(idx, input),
324                init: map_binary(idx, init),
325                value: map_binary(idx, value),
326                author: map_binary(idx, author),
327                reward_type: reward_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
328                block_hash: map_binary(idx, block_hash),
329                block_number: block_number.and_then(|arr| arr.get(idx)),
330                address: map_binary(idx, address),
331                code: map_binary(idx, code),
332                gas_used: map_binary(idx, gas_used),
333                output: map_binary(idx, output),
334                subtraces: subtraces.and_then(|arr| arr.get(idx)),
335                trace_address: trace_address
336                    .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
337                transaction_hash: map_binary(idx, transaction_hash),
338                transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
339                kind: kind.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
340                error: error.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
341            })
342            .collect()
343    }
344}