1use std::sync::Arc;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{new_empty_array, Array, BinaryArray, Utf8Array};
5use polars_arrow::compute;
6use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
7use polars_arrow::record_batch::RecordBatchT as Chunk;
8
9mod util;
10
11pub use util::project_schema;
12
13pub type ArrowChunk = Chunk<Box<dyn Array>>;
14
15fn hash_dt() -> DataType {
16 DataType::BinaryView
17}
18
19fn addr_dt() -> DataType {
20 DataType::BinaryView
21}
22
23fn quantity_dt() -> DataType {
24 DataType::BinaryView
25}
26
27pub fn block_header() -> SchemaRef {
28 Schema::from(vec![
29 Field::new("number", DataType::UInt64, false),
30 Field::new("hash", hash_dt(), false),
31 Field::new("parent_hash", hash_dt(), false),
32 Field::new("nonce", DataType::BinaryView, true),
33 Field::new("sha3_uncles", hash_dt(), false),
34 Field::new("logs_bloom", DataType::BinaryView, false),
35 Field::new("transactions_root", hash_dt(), false),
36 Field::new("state_root", hash_dt(), false),
37 Field::new("receipts_root", hash_dt(), false),
38 Field::new("miner", addr_dt(), false),
39 Field::new("difficulty", quantity_dt(), true),
40 Field::new("total_difficulty", quantity_dt(), true),
41 Field::new("extra_data", DataType::BinaryView, false),
42 Field::new("size", quantity_dt(), false),
43 Field::new("gas_limit", quantity_dt(), false),
44 Field::new("gas_used", quantity_dt(), false),
45 Field::new("timestamp", quantity_dt(), false),
46 Field::new("uncles", DataType::BinaryView, true),
47 Field::new("base_fee_per_gas", quantity_dt(), true),
48 Field::new("blob_gas_used", quantity_dt(), true),
49 Field::new("excess_blob_gas", quantity_dt(), true),
50 Field::new("parent_beacon_block_root", hash_dt(), true),
51 Field::new("withdrawals_root", hash_dt(), true),
52 Field::new("withdrawals", DataType::BinaryView, true),
53 Field::new("l1_block_number", DataType::UInt64, true),
54 Field::new("send_count", quantity_dt(), true),
55 Field::new("send_root", hash_dt(), true),
56 Field::new("mix_hash", hash_dt(), true),
57 ])
58 .into()
59}
60
61pub fn transaction() -> SchemaRef {
62 Schema::from(vec![
63 Field::new("block_hash", hash_dt(), false),
64 Field::new("block_number", DataType::UInt64, false),
65 Field::new("from", addr_dt(), true),
66 Field::new("gas", quantity_dt(), false),
67 Field::new("gas_price", quantity_dt(), true),
68 Field::new("hash", hash_dt(), false),
69 Field::new("input", DataType::BinaryView, false),
70 Field::new("nonce", quantity_dt(), false),
71 Field::new("to", addr_dt(), true),
72 Field::new("transaction_index", DataType::UInt64, false),
73 Field::new("value", quantity_dt(), false),
74 Field::new("v", quantity_dt(), true),
75 Field::new("r", quantity_dt(), true),
76 Field::new("s", quantity_dt(), true),
77 Field::new("max_priority_fee_per_gas", quantity_dt(), true),
78 Field::new("max_fee_per_gas", quantity_dt(), true),
79 Field::new("chain_id", quantity_dt(), true),
80 Field::new("cumulative_gas_used", quantity_dt(), false),
81 Field::new("effective_gas_price", quantity_dt(), false),
82 Field::new("gas_used", quantity_dt(), false),
83 Field::new("contract_address", addr_dt(), true),
84 Field::new("logs_bloom", DataType::BinaryView, false),
85 Field::new("type", DataType::UInt8, true),
86 Field::new("root", hash_dt(), true),
87 Field::new("status", DataType::UInt8, true),
88 Field::new("sighash", DataType::BinaryView, true),
89 Field::new("y_parity", quantity_dt(), true),
90 Field::new("access_list", DataType::BinaryView, true),
91 Field::new("l1_fee", quantity_dt(), true),
92 Field::new("l1_gas_price", quantity_dt(), true),
93 Field::new("l1_gas_used", quantity_dt(), true),
94 Field::new("l1_fee_scalar", quantity_dt(), true),
95 Field::new("gas_used_for_l1", quantity_dt(), true),
96 Field::new("max_fee_per_blob_gas", quantity_dt(), true),
97 Field::new("blob_versioned_hashes", DataType::BinaryView, true),
98 Field::new("deposit_nonce", quantity_dt(), true),
99 Field::new("blob_gas_price", quantity_dt(), true),
100 Field::new("deposit_receipt_version", quantity_dt(), true),
101 Field::new("blob_gas_used", quantity_dt(), true),
102 Field::new("l1_base_fee_scalar", quantity_dt(), true),
103 Field::new("l1_blob_base_fee", quantity_dt(), true),
104 Field::new("l1_blob_base_fee_scalar", quantity_dt(), true),
105 Field::new("l1_block_number", quantity_dt(), true),
106 Field::new("mint", quantity_dt(), true),
107 Field::new("source_hash", hash_dt(), true),
108 ])
109 .into()
110}
111
112pub fn log() -> SchemaRef {
113 Schema::from(vec![
114 Field::new("removed", DataType::Boolean, true),
115 Field::new("log_index", DataType::UInt64, false),
116 Field::new("transaction_index", DataType::UInt64, false),
117 Field::new("transaction_hash", hash_dt(), false),
118 Field::new("block_hash", hash_dt(), false),
119 Field::new("block_number", DataType::UInt64, false),
120 Field::new("address", addr_dt(), false),
121 Field::new("data", DataType::BinaryView, false),
122 Field::new("topic0", DataType::BinaryView, true),
123 Field::new("topic1", DataType::BinaryView, true),
124 Field::new("topic2", DataType::BinaryView, true),
125 Field::new("topic3", DataType::BinaryView, true),
126 ])
127 .into()
128}
129
130pub fn trace() -> SchemaRef {
131 Schema::from(vec![
132 Field::new("from", addr_dt(), true),
133 Field::new("to", addr_dt(), true),
134 Field::new("call_type", DataType::Utf8View, true),
135 Field::new("gas", quantity_dt(), true),
136 Field::new("input", DataType::BinaryView, true),
137 Field::new("init", DataType::BinaryView, true),
138 Field::new("value", quantity_dt(), true),
139 Field::new("author", addr_dt(), true),
140 Field::new("reward_type", DataType::Utf8View, true),
141 Field::new("block_hash", DataType::BinaryView, false),
142 Field::new("block_number", DataType::UInt64, false),
143 Field::new("address", addr_dt(), true),
144 Field::new("code", DataType::BinaryView, true),
145 Field::new("gas_used", quantity_dt(), true),
146 Field::new("output", DataType::BinaryView, true),
147 Field::new("subtraces", DataType::UInt64, true),
148 Field::new("trace_address", DataType::BinaryView, true),
149 Field::new("transaction_hash", DataType::BinaryView, true),
150 Field::new("transaction_position", DataType::UInt64, true),
151 Field::new("type", DataType::Utf8View, true),
152 Field::new("error", DataType::Utf8View, true),
153 Field::new("sighash", DataType::BinaryView, true),
154 Field::new("action_address", addr_dt(), true),
155 Field::new("balance", quantity_dt(), true),
156 Field::new("refund_address", addr_dt(), true),
157 ])
158 .into()
159}
160
161pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
162 if chunks.is_empty() {
163 return Err(anyhow!("can't concat 0 chunks"));
164 }
165
166 let num_cols = chunks[0].columns().len();
167
168 let cols = (0..num_cols)
169 .map(|col| {
170 let mut is_utf8 = false;
171 let arrs = chunks
172 .iter()
173 .map(|chunk| {
174 let col = chunk
175 .columns()
176 .get(col)
177 .map(|col| col.as_ref())
178 .context("get column")?;
179 is_utf8 = col.data_type() == &DataType::Utf8;
180 Ok(col)
181 })
182 .collect::<Result<Vec<_>>>()?;
183 if !is_utf8 {
184 compute::concatenate::concatenate(&arrs).context("concat arrays")
185 } else {
186 let arrs = arrs
187 .into_iter()
188 .map(|a| {
189 a.as_any()
190 .downcast_ref::<Utf8Array<i32>>()
191 .unwrap()
192 .to_binary()
193 .boxed()
194 })
195 .collect::<Vec<_>>();
196 let arrs = arrs.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
197 let arr =
198 compute::concatenate::concatenate(arrs.as_slice()).context("concat arrays")?;
199
200 Ok(compute::cast::binary_to_utf8(
201 arr.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
202 DataType::Utf8,
203 )
204 .unwrap()
205 .boxed())
206 }
207 })
208 .collect::<Result<Vec<_>>>()?;
209
210 Ok(ArrowChunk::new(cols))
211}
212
213pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
214 let mut cols = Vec::new();
215 for field in schema.fields.iter() {
216 cols.push(new_empty_array(field.data_type().clone()));
217 }
218 ArrowChunk::new(cols)
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn smoke_test_schema_constructors() {
227 block_header();
228 transaction();
229 log();
230 trace();
231 }
232
233 #[test]
234 fn test_concat_utf8() {
235 let chunks = [
236 Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
237 "hello".to_owned(),
238 )])
239 .boxed()])),
240 Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
241 "world".to_owned(),
242 )])
243 .boxed()])),
244 ];
245
246 let out = concat_chunks(&chunks).unwrap();
247
248 assert_eq!(
249 out,
250 ArrowChunk::new(vec![Utf8Array::<i32>::from(&[
251 Some("hello".to_owned()),
252 Some("world".to_owned())
253 ])
254 .boxed(),])
255 )
256 }
257}