1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3 BinaryArray, BinaryViewArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4 Utf8ViewArray,
5};
6
7use crate::{
8 simple_types::{Block, Log, Trace, Transaction},
9 ArrowBatch,
10};
11
12pub trait FromArrow: Sized {
14 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
18 batch: &ArrowBatch,
19 ) -> Vec<Self>;
20
21 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
25 Self::from_arrow_bin_array::<BinaryArray<i32>, Utf8Array<i32>>(batch)
26 }
27
28 fn from_arrow_bin_view_array(batch: &ArrowBatch) -> Vec<Self> {
32 Self::from_arrow_bin_array::<BinaryViewArray, Utf8ViewArray>(batch)
33 }
34}
35
36pub trait List {
39 type Item: ?Sized;
40
41 fn get_idx(&self, i: usize) -> Option<&Self::Item>;
42}
43
44impl List for BinaryArray<i32> {
45 type Item = [u8];
46
47 fn get_idx(&self, i: usize) -> Option<&[u8]> {
48 self.get(i)
49 }
50}
51impl List for BinaryViewArray {
52 type Item = [u8];
53
54 fn get_idx(&self, i: usize) -> Option<&[u8]> {
55 self.get(i)
56 }
57}
58
59impl List for Utf8Array<i32> {
60 type Item = str;
61
62 fn get_idx(&self, i: usize) -> Option<&str> {
63 self.get(i)
64 }
65}
66
67impl List for Utf8ViewArray {
68 type Item = str;
69
70 fn get_idx(&self, i: usize) -> Option<&str> {
71 self.get(i)
72 }
73}
74
75fn map_binary<'a, T, B>(i: usize, arr: Option<&'a B>) -> Option<T>
76where
77 T: TryFrom<&'a [u8]>,
78 <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
79 B: List<Item = [u8]>,
80{
81 arr.and_then(|arr| arr.get_idx(i).map(|v| T::try_from(v).unwrap()))
82}
83
84impl FromArrow for Block {
85 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
86 let number = batch.column::<UInt64Array>("number").ok();
87 let hash = batch.column::<B>("hash").ok();
88 let parent_hash = batch.column::<B>("parent_hash").ok();
89 let nonce = batch.column::<B>("nonce").ok();
90 let sha3_uncles = batch.column::<B>("sha3_uncles").ok();
91 let logs_bloom = batch.column::<B>("logs_bloom").ok();
92 let transactions_root = batch.column::<B>("transactions_root").ok();
93 let state_root = batch.column::<B>("state_root").ok();
94 let receipts_root = batch.column::<B>("receipts_root").ok();
95 let miner = batch.column::<B>("miner").ok();
96 let difficulty = batch.column::<B>("difficulty").ok();
97 let total_difficulty = batch.column::<B>("total_difficulty").ok();
98 let extra_data = batch.column::<B>("extra_data").ok();
99 let size = batch.column::<B>("size").ok();
100 let gas_limit = batch.column::<B>("gas_limit").ok();
101 let gas_used = batch.column::<B>("gas_used").ok();
102 let timestamp = batch.column::<B>("timestamp").ok();
103 let uncles = batch.column::<B>("uncles").ok();
104 let base_fee_per_gas = batch.column::<B>("base_fee_per_gas").ok();
105 let blob_gas_used = batch.column::<B>("blob_gas_used").ok();
106 let excess_blob_gas = batch.column::<B>("excess_blob_gas").ok();
107 let parent_beacon_block_root = batch.column::<B>("parent_beacon_block_root").ok();
108 let withdrawals_root = batch.column::<B>("withdrawals_root").ok();
109 let withdrawals = batch.column::<B>("withdrawals").ok();
110 let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
111 let send_count = batch.column::<B>("send_count").ok();
112 let send_root = batch.column::<B>("send_root").ok();
113 let mix_hash = batch.column::<B>("mix_hash").ok();
114
115 (0..batch.chunk.len())
116 .map(|idx| Self {
117 number: number.and_then(|arr| arr.get(idx)),
118 hash: map_binary(idx, hash),
119 parent_hash: map_binary(idx, parent_hash),
120 nonce: map_binary(idx, nonce),
121 sha3_uncles: map_binary(idx, sha3_uncles),
122 logs_bloom: map_binary(idx, logs_bloom),
123 transactions_root: map_binary(idx, transactions_root),
124 state_root: map_binary(idx, state_root),
125 receipts_root: map_binary(idx, receipts_root),
126 miner: map_binary(idx, miner),
127 difficulty: map_binary(idx, difficulty),
128 total_difficulty: map_binary(idx, total_difficulty),
129 extra_data: map_binary(idx, extra_data),
130 size: map_binary(idx, size),
131 gas_limit: map_binary(idx, gas_limit),
132 gas_used: map_binary(idx, gas_used),
133 timestamp: map_binary(idx, timestamp),
134 uncles: uncles.and_then(|arr| {
135 arr.get_idx(idx).map(|v| {
136 v.chunks(32)
137 .map(|chunk| chunk.try_into().unwrap())
138 .collect()
139 })
140 }),
141 base_fee_per_gas: map_binary(idx, base_fee_per_gas),
142 blob_gas_used: map_binary(idx, blob_gas_used),
143 excess_blob_gas: map_binary(idx, excess_blob_gas),
144 parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
145 withdrawals_root: map_binary(idx, withdrawals_root),
146 withdrawals: withdrawals
147 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
148 l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
149 send_count: map_binary(idx, send_count),
150 send_root: map_binary(idx, send_root),
151 mix_hash: map_binary(idx, mix_hash),
152 })
153 .collect()
154 }
155}
156
157impl FromArrow for Transaction {
158 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
159 let block_hash = batch.column::<B>("block_hash").ok();
160 let block_number = batch.column::<UInt64Array>("block_number").ok();
161 let from = batch.column::<B>("from").ok();
162 let gas = batch.column::<B>("gas").ok();
163 let gas_price = batch.column::<B>("gas_price").ok();
164 let hash = batch.column::<B>("hash").ok();
165 let input = batch.column::<B>("input").ok();
166 let nonce = batch.column::<B>("nonce").ok();
167 let to = batch.column::<B>("to").ok();
168 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
169 let value = batch.column::<B>("value").ok();
170 let v = batch.column::<B>("v").ok();
171 let r = batch.column::<B>("r").ok();
172 let s = batch.column::<B>("s").ok();
173 let y_parity = batch.column::<B>("y_parity").ok();
174 let max_priority_fee_per_gas = batch.column::<B>("max_priority_fee_per_gas").ok();
175 let max_fee_per_gas = batch.column::<B>("max_fee_per_gas").ok();
176 let chain_id = batch.column::<B>("chain_id").ok();
177 let access_list = batch.column::<B>("access_list").ok();
178 let authorization_list = batch.column::<B>("authorization_list").ok();
179 let max_fee_per_blob_gas = batch.column::<B>("max_fee_per_blob_gas").ok();
180 let blob_versioned_hashes = batch.column::<B>("blob_versioned_hashes").ok();
181 let cumulative_gas_used = batch.column::<B>("cumulative_gas_used").ok();
182 let effective_gas_price = batch.column::<B>("effective_gas_price").ok();
183 let gas_used = batch.column::<B>("gas_used").ok();
184 let contract_address = batch.column::<B>("contract_address").ok();
185 let logs_bloom = batch.column::<B>("logs_bloom").ok();
186 let type_ = batch.column::<UInt8Array>("type").ok();
187 let root = batch.column::<B>("root").ok();
188 let status = batch.column::<UInt8Array>("status").ok();
189 let l1_fee = batch.column::<B>("l1_fee").ok();
190 let l1_gas_price = batch.column::<B>("l1_gas_price").ok();
191 let l1_gas_used = batch.column::<B>("l1_gas_used").ok();
192 let l1_fee_scalar = batch.column::<B>("l1_fee_scalar").ok();
193 let gas_used_for_l1 = batch.column::<B>("gas_used_for_l1").ok();
194 let blob_gas_price = batch.column::<B>("blob_gas_price").ok();
195 let blob_gas_used = batch.column::<B>("blob_gas_used").ok();
196 let deposit_nonce = batch.column::<B>("deposit_nonce").ok();
197 let deposit_receipt_version = batch.column::<B>("deposit_receipt_version").ok();
198 let l1_base_fee_scalar = batch.column::<B>("l1_base_fee_scalar").ok();
199 let l1_blob_base_fee = batch.column::<B>("l1_blob_base_fee").ok();
200 let l1_blob_base_fee_scalar = batch.column::<B>("l1_blob_base_fee_scalar").ok();
201 let l1_block_number = batch.column::<B>("l1_block_number").ok();
202 let mint = batch.column::<B>("mint").ok();
203 let sighash = batch.column::<B>("sighash").ok();
204 let source_hash = batch.column::<B>("source_hash").ok();
205
206 (0..batch.chunk.len())
207 .map(|idx| Self {
208 block_hash: map_binary(idx, block_hash),
209 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
210 from: map_binary(idx, from),
211 gas: map_binary(idx, gas),
212 gas_price: map_binary(idx, gas_price),
213 hash: map_binary(idx, hash),
214 input: map_binary(idx, input),
215 nonce: map_binary(idx, nonce),
216 to: map_binary(idx, to),
217 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
218 value: map_binary(idx, value),
219 v: map_binary(idx, v),
220 r: map_binary(idx, r),
221 s: map_binary(idx, s),
222 y_parity: map_binary(idx, y_parity),
223 max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
224 max_fee_per_gas: map_binary(idx, max_fee_per_gas),
225 chain_id: map_binary(idx, chain_id),
226 access_list: access_list
227 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
228 authorization_list: authorization_list
229 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
230 max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
231 blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
232 arr.get_idx(idx).map(|v| {
233 v.chunks(32)
234 .map(|chunk| chunk.try_into().unwrap())
235 .collect()
236 })
237 }),
238 cumulative_gas_used: map_binary(idx, cumulative_gas_used),
239 effective_gas_price: map_binary(idx, effective_gas_price),
240 gas_used: map_binary(idx, gas_used),
241 contract_address: map_binary(idx, contract_address),
242 logs_bloom: map_binary(idx, logs_bloom),
243 type_: type_.and_then(|arr| arr.get(idx).map(|v| v.into())),
244 root: map_binary(idx, root),
245 status: status.and_then(|arr| {
246 arr.get(idx)
247 .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
248 }),
249 l1_fee: map_binary(idx, l1_fee),
250 l1_gas_price: map_binary(idx, l1_gas_price),
251 l1_gas_used: map_binary(idx, l1_gas_used),
252 l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
253 arr.get_idx(idx)
254 .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
255 }),
256 gas_used_for_l1: map_binary(idx, gas_used_for_l1),
257 blob_gas_price: map_binary(idx, blob_gas_price),
258 blob_gas_used: map_binary(idx, blob_gas_used),
259 deposit_nonce: map_binary(idx, deposit_nonce),
260 deposit_receipt_version: map_binary(idx, deposit_receipt_version),
261 l1_base_fee_scalar: map_binary(idx, l1_base_fee_scalar),
262 l1_blob_base_fee: map_binary(idx, l1_blob_base_fee),
263 l1_blob_base_fee_scalar: map_binary(idx, l1_blob_base_fee_scalar),
264 l1_block_number: map_binary(idx, l1_block_number),
265 mint: map_binary(idx, mint),
266 sighash: map_binary(idx, sighash),
267 source_hash: map_binary(idx, source_hash),
268 })
269 .collect()
270 }
271}
272
273impl FromArrow for Log {
274 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
275 let removed = batch.column::<BooleanArray>("removed").ok();
276 let log_index = batch.column::<UInt64Array>("log_index").ok();
277 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
278 let transaction_hash = batch.column::<B>("transaction_hash").ok();
279 let block_hash = batch.column::<B>("block_hash").ok();
280 let block_number = batch.column::<UInt64Array>("block_number").ok();
281 let address = batch.column::<B>("address").ok();
282 let data = batch.column::<B>("data").ok();
283 let topic0 = batch.column::<B>("topic0").ok();
284 let topic1 = batch.column::<B>("topic1").ok();
285 let topic2 = batch.column::<B>("topic2").ok();
286 let topic3 = batch.column::<B>("topic3").ok();
287
288 (0..batch.chunk.len())
289 .map(|idx| Self {
290 removed: removed.and_then(|arr| arr.get(idx)),
291 log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
292 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
293 transaction_hash: map_binary(idx, transaction_hash),
294 block_hash: map_binary(idx, block_hash),
295 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
296 address: map_binary(idx, address),
297 data: map_binary(idx, data),
298 topics: {
299 let mut arr = ArrayVec::new();
300
301 arr.push(map_binary(idx, topic0));
302 arr.push(map_binary(idx, topic1));
303 arr.push(map_binary(idx, topic2));
304 arr.push(map_binary(idx, topic3));
305
306 arr
307 },
308 })
309 .collect()
310 }
311}
312
313impl FromArrow for Trace {
314 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
315 batch: &ArrowBatch,
316 ) -> Vec<Self> {
317 let from = batch.column::<B>("from").ok();
318 let to = batch.column::<B>("to").ok();
319 let call_type = batch.column::<U>("call_type").ok();
320 let gas = batch.column::<B>("gas").ok();
321 let input = batch.column::<B>("input").ok();
322 let init = batch.column::<B>("init").ok();
323 let value = batch.column::<B>("value").ok();
324 let author = batch.column::<B>("author").ok();
325 let reward_type = batch.column::<U>("reward_type").ok();
326 let block_hash = batch.column::<B>("block_hash").ok();
327 let block_number = batch.column::<UInt64Array>("block_number").ok();
328 let address = batch.column::<B>("address").ok();
329 let code = batch.column::<B>("code").ok();
330 let gas_used = batch.column::<B>("gas_used").ok();
331 let output = batch.column::<B>("output").ok();
332 let subtraces = batch.column::<UInt64Array>("subtraces").ok();
333 let trace_address = batch.column::<B>("trace_address").ok();
334 let transaction_hash = batch.column::<B>("transaction_hash").ok();
335 let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
336 let type_ = batch.column::<U>("type").ok();
337 let error = batch.column::<U>("error").ok();
338 let sighash = batch.column::<B>("sighash").ok();
339 let action_address = batch.column::<B>("action_address").ok();
340 let balance = batch.column::<B>("balance").ok();
341 let refund_address = batch.column::<B>("refund_address").ok();
342
343 (0..batch.chunk.len())
344 .map(|idx| Self {
345 from: map_binary(idx, from),
346 to: map_binary(idx, to),
347 call_type: call_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
348 gas: map_binary(idx, gas),
349 input: map_binary(idx, input),
350 init: map_binary(idx, init),
351 value: map_binary(idx, value),
352 author: map_binary(idx, author),
353 reward_type: reward_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
354 block_hash: map_binary(idx, block_hash),
355 block_number: block_number.and_then(|arr| arr.get(idx)),
356 address: map_binary(idx, address),
357 code: map_binary(idx, code),
358 gas_used: map_binary(idx, gas_used),
359 output: map_binary(idx, output),
360 subtraces: subtraces.and_then(|arr| arr.get(idx)),
361 trace_address: trace_address
362 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
363 transaction_hash: map_binary(idx, transaction_hash),
364 transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
365 type_: type_.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
366 error: error.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
367 sighash: map_binary(idx, sighash),
368 action_address: map_binary(idx, action_address),
369 balance: map_binary(idx, balance),
370 refund_address: map_binary(idx, refund_address),
371 })
372 .collect()
373 }
374}