1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3 BinaryArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4};
5
6use crate::{
7 simple_types::{Block, Log, Trace, Transaction},
8 ArrowBatch,
9};
10
11pub trait FromArrow: Sized {
13 fn from_arrow(batch: &ArrowBatch) -> Vec<Self>;
15}
16
17fn map_binary<'a, T>(i: usize, arr: Option<&'a BinaryArray<i32>>) -> Option<T>
18where
19 T: TryFrom<&'a [u8]>,
20 <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
21{
22 arr.and_then(|arr| arr.get(i).map(|v| v.try_into().unwrap()))
23}
24
25impl FromArrow for Block {
26 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
27 let number = batch.column::<UInt64Array>("number").ok();
28 let hash = batch.column::<BinaryArray<i32>>("hash").ok();
29 let parent_hash = batch.column::<BinaryArray<i32>>("parent_hash").ok();
30 let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
31 let sha3_uncles = batch.column::<BinaryArray<i32>>("sha3_uncles").ok();
32 let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
33 let transactions_root = batch.column::<BinaryArray<i32>>("transactions_root").ok();
34 let state_root = batch.column::<BinaryArray<i32>>("state_root").ok();
35 let receipts_root = batch.column::<BinaryArray<i32>>("receipts_root").ok();
36 let miner = batch.column::<BinaryArray<i32>>("miner").ok();
37 let difficulty = batch.column::<BinaryArray<i32>>("difficulty").ok();
38 let total_difficulty = batch.column::<BinaryArray<i32>>("total_difficulty").ok();
39 let extra_data = batch.column::<BinaryArray<i32>>("extra_data").ok();
40 let size = batch.column::<BinaryArray<i32>>("size").ok();
41 let gas_limit = batch.column::<BinaryArray<i32>>("gas_limit").ok();
42 let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
43 let timestamp = batch.column::<BinaryArray<i32>>("timestamp").ok();
44 let uncles = batch.column::<BinaryArray<i32>>("uncles").ok();
45 let base_fee_per_gas = batch.column::<BinaryArray<i32>>("base_fee_per_gas").ok();
46 let blob_gas_used = batch.column::<BinaryArray<i32>>("blob_gas_used").ok();
47 let excess_blob_gas = batch.column::<BinaryArray<i32>>("excess_blob_gas").ok();
48 let parent_beacon_block_root = batch
49 .column::<BinaryArray<i32>>("parent_beacon_block_root")
50 .ok();
51 let withdrawals_root = batch.column::<BinaryArray<i32>>("withdrawals_root").ok();
52 let withdrawals = batch.column::<BinaryArray<i32>>("withdrawals").ok();
53 let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
54 let send_count = batch.column::<BinaryArray<i32>>("send_count").ok();
55 let send_root = batch.column::<BinaryArray<i32>>("send_root").ok();
56 let mix_hash = batch.column::<BinaryArray<i32>>("mix_hash").ok();
57
58 (0..batch.chunk.len())
59 .map(|idx| Self {
60 number: number.and_then(|arr| arr.get(idx)),
61 hash: map_binary(idx, hash),
62 parent_hash: map_binary(idx, parent_hash),
63 nonce: map_binary(idx, nonce),
64 sha3_uncles: map_binary(idx, sha3_uncles),
65 logs_bloom: map_binary(idx, logs_bloom),
66 transactions_root: map_binary(idx, transactions_root),
67 state_root: map_binary(idx, state_root),
68 receipts_root: map_binary(idx, receipts_root),
69 miner: map_binary(idx, miner),
70 difficulty: map_binary(idx, difficulty),
71 total_difficulty: map_binary(idx, total_difficulty),
72 extra_data: map_binary(idx, extra_data),
73 size: map_binary(idx, size),
74 gas_limit: map_binary(idx, gas_limit),
75 gas_used: map_binary(idx, gas_used),
76 timestamp: map_binary(idx, timestamp),
77 uncles: uncles.and_then(|arr| {
78 arr.get(idx).map(|v| {
79 v.chunks(32)
80 .map(|chunk| chunk.try_into().unwrap())
81 .collect()
82 })
83 }),
84 base_fee_per_gas: map_binary(idx, base_fee_per_gas),
85 blob_gas_used: map_binary(idx, blob_gas_used),
86 excess_blob_gas: map_binary(idx, excess_blob_gas),
87 parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
88 withdrawals_root: map_binary(idx, withdrawals_root),
89 withdrawals: withdrawals
90 .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
91 l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
92 send_count: map_binary(idx, send_count),
93 send_root: map_binary(idx, send_root),
94 mix_hash: map_binary(idx, mix_hash),
95 })
96 .collect()
97 }
98}
99
100impl FromArrow for Transaction {
101 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
102 let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
103 let block_number = batch.column::<UInt64Array>("block_number").ok();
104 let from = batch.column::<BinaryArray<i32>>("from").ok();
105 let gas = batch.column::<BinaryArray<i32>>("gas").ok();
106 let gas_price = batch.column::<BinaryArray<i32>>("gas_price").ok();
107 let hash = batch.column::<BinaryArray<i32>>("hash").ok();
108 let input = batch.column::<BinaryArray<i32>>("input").ok();
109 let nonce = batch.column::<BinaryArray<i32>>("nonce").ok();
110 let to = batch.column::<BinaryArray<i32>>("to").ok();
111 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
112 let value = batch.column::<BinaryArray<i32>>("value").ok();
113 let v = batch.column::<BinaryArray<i32>>("v").ok();
114 let r = batch.column::<BinaryArray<i32>>("r").ok();
115 let s = batch.column::<BinaryArray<i32>>("s").ok();
116 let y_parity = batch.column::<BinaryArray<i32>>("y_parity").ok();
117 let max_priority_fee_per_gas = batch
118 .column::<BinaryArray<i32>>("max_priority_fee_per_gas")
119 .ok();
120 let max_fee_per_gas = batch.column::<BinaryArray<i32>>("max_fee_per_gas").ok();
121 let chain_id = batch.column::<BinaryArray<i32>>("chain_id").ok();
122 let access_list = batch.column::<BinaryArray<i32>>("access_list").ok();
123 let authorization_list = batch.column::<BinaryArray<i32>>("authorization_list").ok();
124 let max_fee_per_blob_gas = batch
125 .column::<BinaryArray<i32>>("max_fee_per_blob_gas")
126 .ok();
127 let blob_versioned_hashes = batch
128 .column::<BinaryArray<i32>>("blob_versioned_hashes")
129 .ok();
130 let cumulative_gas_used = batch.column::<BinaryArray<i32>>("cumulative_gas_used").ok();
131 let effective_gas_price = batch.column::<BinaryArray<i32>>("effective_gas_price").ok();
132 let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
133 let contract_address = batch.column::<BinaryArray<i32>>("contract_address").ok();
134 let logs_bloom = batch.column::<BinaryArray<i32>>("logs_bloom").ok();
135 let kind = batch.column::<UInt8Array>("type").ok();
136 let root = batch.column::<BinaryArray<i32>>("root").ok();
137 let status = batch.column::<UInt8Array>("status").ok();
138 let l1_fee = batch.column::<BinaryArray<i32>>("l1_fee").ok();
139 let l1_gas_price = batch.column::<BinaryArray<i32>>("l1_gas_price").ok();
140 let l1_gas_used = batch.column::<BinaryArray<i32>>("l1_gas_used").ok();
141 let l1_fee_scalar = batch.column::<BinaryArray<i32>>("l1_fee_scalar").ok();
142 let gas_used_for_l1 = batch.column::<BinaryArray<i32>>("gas_used_for_l1").ok();
143
144 (0..batch.chunk.len())
145 .map(|idx| Self {
146 block_hash: map_binary(idx, block_hash),
147 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
148 from: map_binary(idx, from),
149 gas: map_binary(idx, gas),
150 gas_price: map_binary(idx, gas_price),
151 hash: map_binary(idx, hash),
152 input: map_binary(idx, input),
153 nonce: map_binary(idx, nonce),
154 to: map_binary(idx, to),
155 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
156 value: map_binary(idx, value),
157 v: map_binary(idx, v),
158 r: map_binary(idx, r),
159 s: map_binary(idx, s),
160 y_parity: map_binary(idx, y_parity),
161 max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
162 max_fee_per_gas: map_binary(idx, max_fee_per_gas),
163 chain_id: map_binary(idx, chain_id),
164 access_list: access_list
165 .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
166 authorization_list: authorization_list
167 .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
168 max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
169 blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
170 arr.get(idx).map(|v| {
171 v.chunks(32)
172 .map(|chunk| chunk.try_into().unwrap())
173 .collect()
174 })
175 }),
176 cumulative_gas_used: map_binary(idx, cumulative_gas_used),
177 effective_gas_price: map_binary(idx, effective_gas_price),
178 gas_used: map_binary(idx, gas_used),
179 contract_address: map_binary(idx, contract_address),
180 logs_bloom: map_binary(idx, logs_bloom),
181 kind: kind.and_then(|arr| arr.get(idx).map(|v| v.into())),
182 root: map_binary(idx, root),
183 status: status.and_then(|arr| {
184 arr.get(idx)
185 .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
186 }),
187 l1_fee: map_binary(idx, l1_fee),
188 l1_gas_price: map_binary(idx, l1_gas_price),
189 l1_gas_used: map_binary(idx, l1_gas_used),
190 l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
191 arr.get(idx)
192 .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
193 }),
194 gas_used_for_l1: map_binary(idx, gas_used_for_l1),
195 })
196 .collect()
197 }
198}
199
200impl FromArrow for Log {
201 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
202 let removed = batch.column::<BooleanArray>("removed").ok();
203 let log_index = batch.column::<UInt64Array>("log_index").ok();
204 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
205 let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
206 let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
207 let block_number = batch.column::<UInt64Array>("block_number").ok();
208 let address = batch.column::<BinaryArray<i32>>("address").ok();
209 let data = batch.column::<BinaryArray<i32>>("data").ok();
210 let topic0 = batch.column::<BinaryArray<i32>>("topic0").ok();
211 let topic1 = batch.column::<BinaryArray<i32>>("topic1").ok();
212 let topic2 = batch.column::<BinaryArray<i32>>("topic2").ok();
213 let topic3 = batch.column::<BinaryArray<i32>>("topic3").ok();
214
215 (0..batch.chunk.len())
216 .map(|idx| Self {
217 removed: removed.and_then(|arr| arr.get(idx)),
218 log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
219 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
220 transaction_hash: map_binary(idx, transaction_hash),
221 block_hash: map_binary(idx, block_hash),
222 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
223 address: map_binary(idx, address),
224 data: map_binary(idx, data),
225 topics: {
226 let mut arr = ArrayVec::new();
227
228 arr.push(map_binary(idx, topic0));
229 arr.push(map_binary(idx, topic1));
230 arr.push(map_binary(idx, topic2));
231 arr.push(map_binary(idx, topic3));
232
233 arr
234 },
235 })
236 .collect()
237 }
238}
239
240impl FromArrow for Trace {
241 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
242 let from = batch.column::<BinaryArray<i32>>("from").ok();
243 let to = batch.column::<BinaryArray<i32>>("to").ok();
244 let call_type = batch.column::<Utf8Array<i32>>("call_type").ok();
245 let gas = batch.column::<BinaryArray<i32>>("gas").ok();
246 let input = batch.column::<BinaryArray<i32>>("input").ok();
247 let init = batch.column::<BinaryArray<i32>>("init").ok();
248 let value = batch.column::<BinaryArray<i32>>("value").ok();
249 let author = batch.column::<BinaryArray<i32>>("author").ok();
250 let reward_type = batch.column::<Utf8Array<i32>>("reward_type").ok();
251 let block_hash = batch.column::<BinaryArray<i32>>("block_hash").ok();
252 let block_number = batch.column::<UInt64Array>("block_number").ok();
253 let address = batch.column::<BinaryArray<i32>>("address").ok();
254 let code = batch.column::<BinaryArray<i32>>("code").ok();
255 let gas_used = batch.column::<BinaryArray<i32>>("gas_used").ok();
256 let output = batch.column::<BinaryArray<i32>>("output").ok();
257 let subtraces = batch.column::<UInt64Array>("subtraces").ok();
258 let trace_address = batch.column::<BinaryArray<i32>>("trace_address").ok();
259 let transaction_hash = batch.column::<BinaryArray<i32>>("transaction_hash").ok();
260 let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
261 let kind = batch.column::<Utf8Array<i32>>("type").ok();
262 let error = batch.column::<Utf8Array<i32>>("error").ok();
263
264 (0..batch.chunk.len())
265 .map(|idx| Self {
266 from: map_binary(idx, from),
267 to: map_binary(idx, to),
268 call_type: call_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
269 gas: map_binary(idx, gas),
270 input: map_binary(idx, input),
271 init: map_binary(idx, init),
272 value: map_binary(idx, value),
273 author: map_binary(idx, author),
274 reward_type: reward_type.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
275 block_hash: map_binary(idx, block_hash),
276 block_number: block_number.and_then(|arr| arr.get(idx)),
277 address: map_binary(idx, address),
278 code: map_binary(idx, code),
279 gas_used: map_binary(idx, gas_used),
280 output: map_binary(idx, output),
281 subtraces: subtraces.and_then(|arr| arr.get(idx)),
282 trace_address: trace_address
283 .and_then(|arr| arr.get(idx).map(|v| bincode::deserialize(v).unwrap())),
284 transaction_hash: map_binary(idx, transaction_hash),
285 transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
286 kind: kind.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
287 error: error.and_then(|arr| arr.get(idx).map(|v| v.to_owned())),
288 })
289 .collect()
290 }
291}