1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3 BinaryArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4};
5
6use crate::{
7 simple_types::{Block, Log, Trace, Transaction},
8 ArrowBatch,
9};
10
11pub trait FromArrow: Sized {
13 fn from_arrow(batch: &ArrowBatch) -> Vec<Self>;
15}
16
17fn map_binary<'a, T>(i: usize, arr: Option<&'a BinaryArray<i32>>) -> Option<T>
18where
19 T: TryFrom<&'a [u8]>,
20 <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
21{
22 arr.and_then(|arr| arr.get(i).map(|v| v.try_into().unwrap()))
23}
24
25impl FromArrow for Block {
26 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
27 let number = batch.column::<UInt64Array>("number").ok();
28 let hash = batch.column::<BinaryArray<i32>>("hash").ok();
29 let parent_hash = batch.column::<BinaryArray<i32>>("parent_hash").ok();
30 let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
31 let sha3_uncles = batch.column::<BinaryArray<i32>>("sha3_uncles").ok();
32 let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
33 let transactions_root = batch.column::<BinaryArray<i32>>("transactions_root").ok();
34 let state_root = batch.column::<BinaryArray<i32>>("state_root").ok();
35 let receipts_root = batch.column::<BinaryArray<i32>>("receipts_root").ok();
36 let miner = batch.column::<BinaryArray<i32>>("miner").ok();
37 let difficulty = batch.column::<BinaryArray<i32>>("difficulty").ok();
38 let total_difficulty = batch.column::<BinaryArray<i32>>("total_difficulty").ok();
39 let extra_data = batch.column::<BinaryArray<i32>>("extra_data").ok();
40 let size = batch.column::<BinaryArray<i32>>("size").ok();
41 let gas_limit = batch.column::<BinaryArray<i32>>("gas_limit").ok();
42 let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
43 let timestamp = batch.column::<BinaryArray<i32>>("timestamp").ok();
44 let uncles = batch.column::<BinaryArray<i32>>("uncles").ok();
45 let base_fee_per_gas = batch.column::<BinaryArray<i32>>("base_fee_per_gas").ok();
46 let blob_gas_used = batch.column::<BinaryArray<i32>>("blob_gas_used").ok();
47 let excess_blob_gas = batch.column::<BinaryArray<i32>>("excess_blob_gas").ok();
48 let parent_beacon_block_root = batch
49 .column::<BinaryArray<i32>>("parent_beacon_block_root")
50 .ok();
51 let withdrawals_root = batch.column::<BinaryArray<i32>>("withdrawals_root").ok();
52 let withdrawals = batch.column::<BinaryArray<i32>>("withdrawals").ok();
53 let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
54 let send_count = batch.column::<BinaryArray<i32>>("send_count").ok();
55 let send_root = batch.column::<BinaryArray<i32>>("send_root").ok();
56 let mix_hash = batch.column::<BinaryArray<i32>>("mix_hash").ok();
57
58 (0..batch.chunk.len())
59 .map(|idx| Self {
60 number: number.and_then(|arr| arr.get(idx)),
61 hash: map_binary(idx, hash),
62 parent_hash: map_binary(idx, parent_hash),
63 nonce: map_binary(idx, nonce),
64 sha3_uncles: map_binary(idx, sha3_uncles),
65 logs_bloom: map_binary(idx, logs_bloom),
66 transactions_root: map_binary(idx, transactions_root),
67 state_root: map_binary(idx, state_root),
68 receipts_root: map_binary(idx, receipts_root),
69 miner: map_binary(idx, miner),
70 difficulty: map_binary(idx, difficulty),
71 total_difficulty: map_binary(idx, total_difficulty),
72 extra_data: map_binary(idx, extra_data),
73 size: map_binary(idx, size),
74 gas_limit: map_binary(idx, gas_limit),
75 gas_used: map_binary(idx, gas_used),
76 timestamp: map_binary(idx, timestamp),
77 uncles: uncles.and_then(|arr| {
78 arr.get(idx).map(|v| {
79 v.chunks(32)
80 .map(|chunk| chunk.try_into().unwrap())
81 .collect()
82 })
83 }),
84 base_fee_per_gas: map_binary(idx, base_fee_per_gas),
85 blob_gas_used: map_binary(idx, blob_gas_used),
86 excess_blob_gas: map_binary(idx, excess_blob_gas),
87 parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
88 withdrawals_root: map_binary(idx, withdrawals_root),
89 withdrawals: withdrawals
90 .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
91 l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
92 send_count: map_binary(idx, send_count),
93 send_root: map_binary(idx, send_root),
94 mix_hash: map_binary(idx, mix_hash),
95 })
96 .collect()
97 }
98}
99
100impl FromArrow for Transaction {
101 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
102 let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
103 let block_number = batch.column::<UInt64Array>("block_number").ok();
104 let from = batch.column::<BinaryArray<i32>>("from").ok();
105 let gas = batch.column::<BinaryArray<i32>>("gas").ok();
106 let gas_price = batch.column::<BinaryArray<i32>>("gas_price").ok();
107 let hash = batch.column::<BinaryArray<i32>>("hash").ok();
108 let input = batch.column::<BinaryArray<i32>>("input").ok();
109 let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
110 let to = batch.column::<BinaryArray<i32>>("to").ok();
111 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
112 let value = batch.column::<BinaryArray<i32>>("value").ok();
113 let v = batch.column::<BinaryArray<i32>>("v").ok();
114 let r = batch.column::<BinaryArray<i32>>("r").ok();
115 let s = batch.column::<BinaryArray<i32>>("s").ok();
116 let y_parity = batch.column::<BinaryArray<i32>>("y_parity").ok();
117 let max_priority_fee_per_gas = batch
118 .column::<BinaryArray<i32>>("max_priority_fee_per_gas")
119 .ok();
120 let max_fee_per_gas = batch.column::<BinaryArray<i32>>("max_fee_per_gas").ok();
121 let chain_id = batch.column::<BinaryArray<i32>>("chain_id").ok();
122 let access_list = batch.column::<BinaryArray<i32>>("access_list").ok();
123 let max_fee_per_blob_gas = batch
124 .column::<BinaryArray<i32>>("max_fee_per_blob_gas")
125 .ok();
126 let blob_versioned_hashes = batch
127 .column::<BinaryArray<i32>>("blob_versioned_hashes")
128 .ok();
129 let cumulative_gas_used = batch.column::<BinaryArray<i32>>("cumulative_gas_used").ok();
130 let effective_gas_price = batch.column::<BinaryArray<i32>>("effective_gas_price").ok();
131 let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
132 let contract_address = batch.column::<BinaryArray<i32>>("contract_address").ok();
133 let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
134 let kind = batch.column::<UInt8Array>("type").ok();
135 let root = batch.column::<BinaryArray<i32>>("root").ok();
136 let status = batch.column::<UInt8Array>("status").ok();
137 let l1_fee = batch.column::<BinaryArray<i32>>("l1_fee").ok();
138 let l1_gas_price = batch.column::<BinaryArray<i32>>("l1_gas_price").ok();
139 let l1_gas_used = batch.column::<BinaryArray<i32>>("l1_gas_used").ok();
140 let l1_fee_scalar = batch.column::<BinaryArray<i32>>("l1_fee_scalar").ok();
141 let gas_used_for_l1 = batch.column::<BinaryArray<i32>>("gas_used_for_l1").ok();
142
143 (0..batch.chunk.len())
144 .map(|idx| Self {
145 block_hash: map_binary(idx, block_hash),
146 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
147 from: map_binary(idx, from),
148 gas: map_binary(idx, gas),
149 gas_price: map_binary(idx, gas_price),
150 hash: map_binary(idx, hash),
151 input: map_binary(idx, input),
152 nonce: map_binary(idx, nonce),
153 to: map_binary(idx, to),
154 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
155 value: map_binary(idx, value),
156 v: map_binary(idx, v),
157 r: map_binary(idx, r),
158 s: map_binary(idx, s),
159 y_parity: map_binary(idx, y_parity),
160 max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
161 max_fee_per_gas: map_binary(idx, max_fee_per_gas),
162 chain_id: map_binary(idx, chain_id),
163 access_list: access_list
164 .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
165 max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
166 blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
167 arr.get(idx).map(|v| {
168 v.chunks(32)
169 .map(|chunk| chunk.try_into().unwrap())
170 .collect()
171 })
172 }),
173 cumulative_gas_used: map_binary(idx, cumulative_gas_used),
174 effective_gas_price: map_binary(idx, effective_gas_price),
175 gas_used: map_binary(idx, gas_used),
176 contract_address: map_binary(idx, contract_address),
177 logs_bloom: map_binary(idx, logs_bloom),
178 kind: kind.and_then(|arr| arr.get(idx).map(|v| v.into())),
179 root: map_binary(idx, root),
180 status: status.and_then(|arr| {
181 arr.get(idx)
182 .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
183 }),
184 l1_fee: map_binary(idx, l1_fee),
185 l1_gas_price: map_binary(idx, l1_gas_price),
186 l1_gas_used: map_binary(idx, l1_gas_used),
187 l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
188 arr.get(idx)
189 .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
190 }),
191 gas_used_for_l1: map_binary(idx, gas_used_for_l1),
192 })
193 .collect()
194 }
195}
196
197impl FromArrow for Log {
198 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
199 let removed = batch.column::<BooleanArray>("removed").ok();
200 let log_index = batch.column::<UInt64Array>("log_index").ok();
201 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
202 let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
203 let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
204 let block_number = batch.column::<UInt64Array>("block_number").ok();
205 let address = batch.column::<BinaryArray<i32>>("address").ok();
206 let data = batch.column::<BinaryArray<i32>>("data").ok();
207 let topic0 = batch.column::<BinaryArray<i32>>("topic0").ok();
208 let topic1 = batch.column::<BinaryArray<i32>>("topic1").ok();
209 let topic2 = batch.column::<BinaryArray<i32>>("topic2").ok();
210 let topic3 = batch.column::<BinaryArray<i32>>("topic3").ok();
211
212 (0..batch.chunk.len())
213 .map(|idx| Self {
214 removed: removed.and_then(|arr| arr.get(idx)),
215 log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
216 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
217 transaction_hash: map_binary(idx, transaction_hash),
218 block_hash: map_binary(idx, block_hash),
219 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
220 address: map_binary(idx, address),
221 data: map_binary(idx, data),
222 topics: {
223 let mut arr = ArrayVec::new();
224
225 arr.push(map_binary(idx, topic0));
226 arr.push(map_binary(idx, topic1));
227 arr.push(map_binary(idx, topic2));
228 arr.push(map_binary(idx, topic3));
229
230 arr
231 },
232 })
233 .collect()
234 }
235}
236
237impl FromArrow for Trace {
238 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
239 let from = batch.column::<BinaryArray<i32>>("from").ok();
240 let to = batch.column::<BinaryArray<i32>>("to").ok();
241 let call_type = batch.column::<Utf8Array<i32>>("call_type").ok();
242 let gas = batch.column::<BinaryArray<i32>>("gas").ok();
243 let input = batch.column::<BinaryArray<i32>>("input").ok();
244 let init = batch.column::<BinaryArray<i32>>("init").ok();
245 let value = batch.column::<BinaryArray<i32>>("value").ok();
246 let author = batch.column::<BinaryArray<i32>>("author").ok();
247 let reward_type = batch.column::<Utf8Array<i32>>("reward_type").ok();
248 let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
249 let block_number = batch.column::<UInt64Array>("block_number").ok();
250 let address = batch.column::<BinaryArray<i32>>("address").ok();
251 let code = batch.column::<BinaryArray<i32>>("code").ok();
252 let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
253 let output = batch.column::<BinaryArray<i32>>("output").ok();
254 let subtraces = batch.column::<UInt64Array>("subtraces").ok();
255 let trace_address = batch.column::<BinaryArray<i32>>("trace_address").ok();
256 let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
257 let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
258 let kind = batch.column::<Utf8Array<i32>>("type").ok();
259 let error = batch.column::<Utf8Array<i32>>("error").ok();
260
261 (0..batch.chunk.len())
262 .map(|idx| Self {
263 from: map_binary(idx, from),
264 to: map_binary(idx, to),
265 call_type: call_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
266 gas: map_binary(idx, gas),
267 input: map_binary(idx, input),
268 init: map_binary(idx, init),
269 value: map_binary(idx, value),
270 author: map_binary(idx, author),
271 reward_type: reward_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
272 block_hash: map_binary(idx, block_hash),
273 block_number: block_number.and_then(|arr| arr.get(idx)),
274 address: map_binary(idx, address),
275 code: map_binary(idx, code),
276 gas_used: map_binary(idx, gas_used),
277 output: map_binary(idx, output),
278 subtraces: subtraces.and_then(|arr| arr.get(idx)),
279 trace_address: trace_address
280 .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
281 transaction_hash: map_binary(idx, transaction_hash),
282 transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
283 kind: kind.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
284 error: error.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
285 })
286 .collect()
287 }
288}