1use arrayvec::ArrayVec;
2use polars_arrow::array::{
3 BinaryArray, BinaryViewArray, BooleanArray, StaticArray, UInt64Array, UInt8Array, Utf8Array,
4 Utf8ViewArray,
5};
6
7use crate::{
8 simple_types::{Block, Log, Trace, Transaction},
9 ArrowBatch,
10};
11
12pub trait FromArrow: Sized {
14 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
18 batch: &ArrowBatch,
19 ) -> Vec<Self>;
20
21 fn from_arrow(batch: &ArrowBatch) -> Vec<Self> {
25 Self::from_arrow_bin_array::<BinaryArray<i32>, Utf8Array<i32>>(batch)
26 }
27
28 fn from_arrow_bin_view_array(batch: &ArrowBatch) -> Vec<Self> {
32 Self::from_arrow_bin_array::<BinaryViewArray, Utf8ViewArray>(batch)
33 }
34}
35
36pub trait List {
39 type Item: ?Sized;
40
41 fn get_idx(&self, i: usize) -> Option<&Self::Item>;
42}
43
44impl List for BinaryArray<i32> {
45 type Item = [u8];
46
47 fn get_idx(&self, i: usize) -> Option<&[u8]> {
48 self.get(i)
49 }
50}
51impl List for BinaryViewArray {
52 type Item = [u8];
53
54 fn get_idx(&self, i: usize) -> Option<&[u8]> {
55 self.get(i)
56 }
57}
58
59impl List for Utf8Array<i32> {
60 type Item = str;
61
62 fn get_idx(&self, i: usize) -> Option<&str> {
63 self.get(i)
64 }
65}
66
67impl List for Utf8ViewArray {
68 type Item = str;
69
70 fn get_idx(&self, i: usize) -> Option<&str> {
71 self.get(i)
72 }
73}
74
75fn map_binary<'a, T, B>(i: usize, arr: Option<&'a B>) -> Option<T>
76where
77 T: TryFrom<&'a [u8]>,
78 <T as TryFrom<&'a [u8]>>::Error: std::fmt::Debug,
79 B: List<Item = [u8]>,
80{
81 arr.and_then(|arr| arr.get_idx(i).map(|v| T::try_from(v).unwrap()))
82}
83
84impl FromArrow for Block {
85 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
86 let number = batch.column::<UInt64Array>("number").ok();
87 let hash = batch.column::<B>("hash").ok();
88 let parent_hash = batch.column::<B>("parent_hash").ok();
89 let nonce = batch.column::<B>("nonce").ok();
90 let sha3_uncles = batch.column::<B>("sha3_uncles").ok();
91 let logs_bloom = batch.column::<B>("logs_bloom").ok();
92 let transactions_root = batch.column::<B>("transactions_root").ok();
93 let state_root = batch.column::<B>("state_root").ok();
94 let receipts_root = batch.column::<B>("receipts_root").ok();
95 let miner = batch.column::<B>("miner").ok();
96 let difficulty = batch.column::<B>("difficulty").ok();
97 let total_difficulty = batch.column::<B>("total_difficulty").ok();
98 let extra_data = batch.column::<B>("extra_data").ok();
99 let size = batch.column::<B>("size").ok();
100 let gas_limit = batch.column::<B>("gas_limit").ok();
101 let gas_used = batch.column::<B>("gas_used").ok();
102 let timestamp = batch.column::<B>("timestamp").ok();
103 let uncles = batch.column::<B>("uncles").ok();
104 let base_fee_per_gas = batch.column::<B>("base_fee_per_gas").ok();
105 let blob_gas_used = batch.column::<B>("blob_gas_used").ok();
106 let excess_blob_gas = batch.column::<B>("excess_blob_gas").ok();
107 let parent_beacon_block_root = batch.column::<B>("parent_beacon_block_root").ok();
108 let withdrawals_root = batch.column::<B>("withdrawals_root").ok();
109 let withdrawals = batch.column::<B>("withdrawals").ok();
110 let l1_block_number = batch.column::<UInt64Array>("l1_block_number").ok();
111 let send_count = batch.column::<B>("send_count").ok();
112 let send_root = batch.column::<B>("send_root").ok();
113 let mix_hash = batch.column::<B>("mix_hash").ok();
114
115 (0..batch.chunk.len())
116 .map(|idx| Self {
117 number: number.and_then(|arr| arr.get(idx)),
118 hash: map_binary(idx, hash),
119 parent_hash: map_binary(idx, parent_hash),
120 nonce: map_binary(idx, nonce),
121 sha3_uncles: map_binary(idx, sha3_uncles),
122 logs_bloom: map_binary(idx, logs_bloom),
123 transactions_root: map_binary(idx, transactions_root),
124 state_root: map_binary(idx, state_root),
125 receipts_root: map_binary(idx, receipts_root),
126 miner: map_binary(idx, miner),
127 difficulty: map_binary(idx, difficulty),
128 total_difficulty: map_binary(idx, total_difficulty),
129 extra_data: map_binary(idx, extra_data),
130 size: map_binary(idx, size),
131 gas_limit: map_binary(idx, gas_limit),
132 gas_used: map_binary(idx, gas_used),
133 timestamp: map_binary(idx, timestamp),
134 uncles: uncles.and_then(|arr| {
135 arr.get_idx(idx).map(|v| {
136 v.chunks(32)
137 .map(|chunk| chunk.try_into().unwrap())
138 .collect()
139 })
140 }),
141 base_fee_per_gas: map_binary(idx, base_fee_per_gas),
142 blob_gas_used: map_binary(idx, blob_gas_used),
143 excess_blob_gas: map_binary(idx, excess_blob_gas),
144 parent_beacon_block_root: map_binary(idx, parent_beacon_block_root),
145 withdrawals_root: map_binary(idx, withdrawals_root),
146 withdrawals: withdrawals
147 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
148 l1_block_number: l1_block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
149 send_count: map_binary(idx, send_count),
150 send_root: map_binary(idx, send_root),
151 mix_hash: map_binary(idx, mix_hash),
152 })
153 .collect()
154 }
155}
156
157impl FromArrow for Transaction {
158 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
159 let block_hash = batch.column::<B>("block_hash").ok();
160 let block_number = batch.column::<UInt64Array>("block_number").ok();
161 let from = batch.column::<B>("from").ok();
162 let gas = batch.column::<B>("gas").ok();
163 let gas_price = batch.column::<B>("gas_price").ok();
164 let hash = batch.column::<B>("hash").ok();
165 let input = batch.column::<B>("input").ok();
166 let nonce = batch.column::<B>("nonce").ok();
167 let to = batch.column::<B>("to").ok();
168 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
169 let value = batch.column::<B>("value").ok();
170 let v = batch.column::<B>("v").ok();
171 let r = batch.column::<B>("r").ok();
172 let s = batch.column::<B>("s").ok();
173 let y_parity = batch.column::<B>("y_parity").ok();
174 let max_priority_fee_per_gas = batch.column::<B>("max_priority_fee_per_gas").ok();
175 let max_fee_per_gas = batch.column::<B>("max_fee_per_gas").ok();
176 let chain_id = batch.column::<B>("chain_id").ok();
177 let access_list = batch.column::<B>("access_list").ok();
178 let authorization_list = batch.column::<B>("authorization_list").ok();
179 let max_fee_per_blob_gas = batch.column::<B>("max_fee_per_blob_gas").ok();
180 let blob_versioned_hashes = batch.column::<B>("blob_versioned_hashes").ok();
181 let cumulative_gas_used = batch.column::<B>("cumulative_gas_used").ok();
182 let effective_gas_price = batch.column::<B>("effective_gas_price").ok();
183 let gas_used = batch.column::<B>("gas_used").ok();
184 let contract_address = batch.column::<B>("contract_address").ok();
185 let logs_bloom = batch.column::<B>("logs_bloom").ok();
186 let kind = batch.column::<UInt8Array>("type").ok();
187 let root = batch.column::<B>("root").ok();
188 let status = batch.column::<UInt8Array>("status").ok();
189 let l1_fee = batch.column::<B>("l1_fee").ok();
190 let l1_gas_price = batch.column::<B>("l1_gas_price").ok();
191 let l1_gas_used = batch.column::<B>("l1_gas_used").ok();
192 let l1_fee_scalar = batch.column::<B>("l1_fee_scalar").ok();
193 let gas_used_for_l1 = batch.column::<B>("gas_used_for_l1").ok();
194
195 (0..batch.chunk.len())
196 .map(|idx| Self {
197 block_hash: map_binary(idx, block_hash),
198 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
199 from: map_binary(idx, from),
200 gas: map_binary(idx, gas),
201 gas_price: map_binary(idx, gas_price),
202 hash: map_binary(idx, hash),
203 input: map_binary(idx, input),
204 nonce: map_binary(idx, nonce),
205 to: map_binary(idx, to),
206 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
207 value: map_binary(idx, value),
208 v: map_binary(idx, v),
209 r: map_binary(idx, r),
210 s: map_binary(idx, s),
211 y_parity: map_binary(idx, y_parity),
212 max_priority_fee_per_gas: map_binary(idx, max_priority_fee_per_gas),
213 max_fee_per_gas: map_binary(idx, max_fee_per_gas),
214 chain_id: map_binary(idx, chain_id),
215 access_list: access_list
216 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
217 authorization_list: authorization_list
218 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
219 max_fee_per_blob_gas: map_binary(idx, max_fee_per_blob_gas),
220 blob_versioned_hashes: blob_versioned_hashes.and_then(|arr| {
221 arr.get_idx(idx).map(|v| {
222 v.chunks(32)
223 .map(|chunk| chunk.try_into().unwrap())
224 .collect()
225 })
226 }),
227 cumulative_gas_used: map_binary(idx, cumulative_gas_used),
228 effective_gas_price: map_binary(idx, effective_gas_price),
229 gas_used: map_binary(idx, gas_used),
230 contract_address: map_binary(idx, contract_address),
231 logs_bloom: map_binary(idx, logs_bloom),
232 kind: kind.and_then(|arr| arr.get(idx).map(|v| v.into())),
233 root: map_binary(idx, root),
234 status: status.and_then(|arr| {
235 arr.get(idx)
236 .map(|v| hypersync_format::TransactionStatus::from_u8(v).unwrap())
237 }),
238 l1_fee: map_binary(idx, l1_fee),
239 l1_gas_price: map_binary(idx, l1_gas_price),
240 l1_gas_used: map_binary(idx, l1_gas_used),
241 l1_fee_scalar: l1_fee_scalar.and_then(|arr| {
242 arr.get_idx(idx)
243 .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())
244 }),
245 gas_used_for_l1: map_binary(idx, gas_used_for_l1),
246 })
247 .collect()
248 }
249}
250
251impl FromArrow for Log {
252 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U>(batch: &ArrowBatch) -> Vec<Self> {
253 let removed = batch.column::<BooleanArray>("removed").ok();
254 let log_index = batch.column::<UInt64Array>("log_index").ok();
255 let transaction_index = batch.column::<UInt64Array>("transaction_index").ok();
256 let transaction_hash = batch.column::<B>("transaction_hash").ok();
257 let block_hash = batch.column::<B>("block_hash").ok();
258 let block_number = batch.column::<UInt64Array>("block_number").ok();
259 let address = batch.column::<B>("address").ok();
260 let data = batch.column::<B>("data").ok();
261 let topic0 = batch.column::<B>("topic0").ok();
262 let topic1 = batch.column::<B>("topic1").ok();
263 let topic2 = batch.column::<B>("topic2").ok();
264 let topic3 = batch.column::<B>("topic3").ok();
265
266 (0..batch.chunk.len())
267 .map(|idx| Self {
268 removed: removed.and_then(|arr| arr.get(idx)),
269 log_index: log_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
270 transaction_index: transaction_index.and_then(|arr| arr.get(idx).map(|v| v.into())),
271 transaction_hash: map_binary(idx, transaction_hash),
272 block_hash: map_binary(idx, block_hash),
273 block_number: block_number.and_then(|arr| arr.get(idx).map(|v| v.into())),
274 address: map_binary(idx, address),
275 data: map_binary(idx, data),
276 topics: {
277 let mut arr = ArrayVec::new();
278
279 arr.push(map_binary(idx, topic0));
280 arr.push(map_binary(idx, topic1));
281 arr.push(map_binary(idx, topic2));
282 arr.push(map_binary(idx, topic3));
283
284 arr
285 },
286 })
287 .collect()
288 }
289}
290
291impl FromArrow for Trace {
292 fn from_arrow_bin_array<B: List<Item = [u8]> + 'static, U: List<Item = str> + 'static>(
293 batch: &ArrowBatch,
294 ) -> Vec<Self> {
295 let from = batch.column::<B>("from").ok();
296 let to = batch.column::<B>("to").ok();
297 let call_type = batch.column::<U>("call_type").ok();
298 let gas = batch.column::<B>("gas").ok();
299 let input = batch.column::<B>("input").ok();
300 let init = batch.column::<B>("init").ok();
301 let value = batch.column::<B>("value").ok();
302 let author = batch.column::<B>("author").ok();
303 let reward_type = batch.column::<U>("reward_type").ok();
304 let block_hash = batch.column::<B>("block_hash").ok();
305 let block_number = batch.column::<UInt64Array>("block_number").ok();
306 let address = batch.column::<B>("address").ok();
307 let code = batch.column::<B>("code").ok();
308 let gas_used = batch.column::<B>("gas_used").ok();
309 let output = batch.column::<B>("output").ok();
310 let subtraces = batch.column::<UInt64Array>("subtraces").ok();
311 let trace_address = batch.column::<B>("trace_address").ok();
312 let transaction_hash = batch.column::<B>("transaction_hash").ok();
313 let transaction_position = batch.column::<UInt64Array>("transaction_position").ok();
314 let kind = batch.column::<U>("type").ok();
315 let error = batch.column::<U>("error").ok();
316
317 (0..batch.chunk.len())
318 .map(|idx| Self {
319 from: map_binary(idx, from),
320 to: map_binary(idx, to),
321 call_type: call_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
322 gas: map_binary(idx, gas),
323 input: map_binary(idx, input),
324 init: map_binary(idx, init),
325 value: map_binary(idx, value),
326 author: map_binary(idx, author),
327 reward_type: reward_type.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
328 block_hash: map_binary(idx, block_hash),
329 block_number: block_number.and_then(|arr| arr.get(idx)),
330 address: map_binary(idx, address),
331 code: map_binary(idx, code),
332 gas_used: map_binary(idx, gas_used),
333 output: map_binary(idx, output),
334 subtraces: subtraces.and_then(|arr| arr.get(idx)),
335 trace_address: trace_address
336 .and_then(|arr| arr.get_idx(idx).map(|v| bincode::deserialize(v).unwrap())),
337 transaction_hash: map_binary(idx, transaction_hash),
338 transaction_position: transaction_position.and_then(|arr| arr.get(idx)),
339 kind: kind.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
340 error: error.and_then(|arr| arr.get_idx(idx).map(|v| v.to_owned())),
341 })
342 .collect()
343 }
344}