1use std::sync::Arc;
46
47use anyhow::{anyhow, Context, Result};
48use polars_arrow::array::{new_empty_array, Array, BinaryArray, Utf8Array};
49use polars_arrow::compute;
50use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
51use polars_arrow::record_batch::RecordBatchT as Chunk;
52
53mod util;
54
55pub use util::project_schema;
56
57pub type ArrowChunk = Chunk<Box<dyn Array>>;
58
59fn hash_dt() -> DataType {
60 DataType::BinaryView
61}
62
63fn addr_dt() -> DataType {
64 DataType::BinaryView
65}
66
67fn quantity_dt() -> DataType {
68 DataType::BinaryView
69}
70
71const NULLABLE: bool = true;
72
73pub fn block_header() -> SchemaRef {
74 Schema::from(vec![
75 Field::new("number", DataType::UInt64, !NULLABLE),
76 Field::new("hash", hash_dt(), !NULLABLE),
77 Field::new("parent_hash", hash_dt(), !NULLABLE),
78 Field::new("nonce", DataType::BinaryView, NULLABLE),
79 Field::new("sha3_uncles", hash_dt(), !NULLABLE),
80 Field::new("logs_bloom", DataType::BinaryView, !NULLABLE),
81 Field::new("transactions_root", hash_dt(), !NULLABLE),
82 Field::new("state_root", hash_dt(), !NULLABLE),
83 Field::new("receipts_root", hash_dt(), !NULLABLE),
84 Field::new("miner", addr_dt(), !NULLABLE),
85 Field::new("difficulty", quantity_dt(), NULLABLE),
86 Field::new("total_difficulty", quantity_dt(), NULLABLE),
87 Field::new("extra_data", DataType::BinaryView, !NULLABLE),
88 Field::new("size", quantity_dt(), !NULLABLE),
89 Field::new("gas_limit", quantity_dt(), !NULLABLE),
90 Field::new("gas_used", quantity_dt(), !NULLABLE),
91 Field::new("timestamp", quantity_dt(), !NULLABLE),
92 Field::new("uncles", DataType::BinaryView, NULLABLE),
93 Field::new("base_fee_per_gas", quantity_dt(), NULLABLE),
94 Field::new("blob_gas_used", quantity_dt(), NULLABLE),
95 Field::new("excess_blob_gas", quantity_dt(), NULLABLE),
96 Field::new("parent_beacon_block_root", hash_dt(), NULLABLE),
97 Field::new("withdrawals_root", hash_dt(), NULLABLE),
98 Field::new("withdrawals", DataType::BinaryView, NULLABLE),
99 Field::new("l1_block_number", DataType::UInt64, NULLABLE),
100 Field::new("send_count", quantity_dt(), NULLABLE),
101 Field::new("send_root", hash_dt(), NULLABLE),
102 Field::new("mix_hash", hash_dt(), NULLABLE),
103 ])
104 .into()
105}
106
107pub fn transaction() -> SchemaRef {
108 Schema::from(vec![
109 Field::new("block_hash", hash_dt(), !NULLABLE),
110 Field::new("block_number", DataType::UInt64, !NULLABLE),
111 Field::new("from", addr_dt(), NULLABLE),
112 Field::new("gas", quantity_dt(), !NULLABLE),
113 Field::new("gas_price", quantity_dt(), NULLABLE),
114 Field::new("hash", hash_dt(), !NULLABLE),
115 Field::new("input", DataType::BinaryView, !NULLABLE),
116 Field::new("nonce", quantity_dt(), !NULLABLE),
117 Field::new("to", addr_dt(), NULLABLE),
118 Field::new("transaction_index", DataType::UInt64, !NULLABLE),
119 Field::new("value", quantity_dt(), !NULLABLE),
120 Field::new("v", quantity_dt(), NULLABLE),
121 Field::new("r", quantity_dt(), NULLABLE),
122 Field::new("s", quantity_dt(), NULLABLE),
123 Field::new("max_priority_fee_per_gas", quantity_dt(), NULLABLE),
124 Field::new("max_fee_per_gas", quantity_dt(), NULLABLE),
125 Field::new("chain_id", quantity_dt(), NULLABLE),
126 Field::new("cumulative_gas_used", quantity_dt(), !NULLABLE),
127 Field::new("effective_gas_price", quantity_dt(), !NULLABLE),
128 Field::new("gas_used", quantity_dt(), !NULLABLE),
129 Field::new("contract_address", addr_dt(), NULLABLE),
130 Field::new("logs_bloom", DataType::BinaryView, !NULLABLE),
131 Field::new("type", DataType::UInt8, NULLABLE),
132 Field::new("root", hash_dt(), NULLABLE),
133 Field::new("status", DataType::UInt8, NULLABLE),
134 Field::new("sighash", DataType::BinaryView, NULLABLE),
135 Field::new("y_parity", quantity_dt(), NULLABLE),
136 Field::new("access_list", DataType::BinaryView, NULLABLE),
137 Field::new("authorization_list", DataType::BinaryView, NULLABLE),
138 Field::new("l1_fee", quantity_dt(), NULLABLE),
139 Field::new("l1_gas_price", quantity_dt(), NULLABLE),
140 Field::new("l1_gas_used", quantity_dt(), NULLABLE),
141 Field::new("l1_fee_scalar", quantity_dt(), NULLABLE),
142 Field::new("gas_used_for_l1", quantity_dt(), NULLABLE),
143 Field::new("max_fee_per_blob_gas", quantity_dt(), NULLABLE),
144 Field::new("blob_versioned_hashes", DataType::BinaryView, NULLABLE),
145 Field::new("deposit_nonce", quantity_dt(), NULLABLE),
146 Field::new("blob_gas_price", quantity_dt(), NULLABLE),
147 Field::new("deposit_receipt_version", quantity_dt(), NULLABLE),
148 Field::new("blob_gas_used", quantity_dt(), NULLABLE),
149 Field::new("l1_base_fee_scalar", quantity_dt(), NULLABLE),
150 Field::new("l1_blob_base_fee", quantity_dt(), NULLABLE),
151 Field::new("l1_blob_base_fee_scalar", quantity_dt(), NULLABLE),
152 Field::new("l1_block_number", quantity_dt(), NULLABLE),
153 Field::new("mint", quantity_dt(), NULLABLE),
154 Field::new("source_hash", hash_dt(), NULLABLE),
155 ])
156 .into()
157}
158
159pub fn log() -> SchemaRef {
160 Schema::from(vec![
161 Field::new("removed", DataType::Boolean, NULLABLE),
162 Field::new("log_index", DataType::UInt64, !NULLABLE),
163 Field::new("transaction_index", DataType::UInt64, !NULLABLE),
164 Field::new("transaction_hash", hash_dt(), !NULLABLE),
165 Field::new("block_hash", hash_dt(), !NULLABLE),
166 Field::new("block_number", DataType::UInt64, !NULLABLE),
167 Field::new("address", addr_dt(), !NULLABLE),
168 Field::new("data", DataType::BinaryView, !NULLABLE),
169 Field::new("topic0", DataType::BinaryView, NULLABLE),
170 Field::new("topic1", DataType::BinaryView, NULLABLE),
171 Field::new("topic2", DataType::BinaryView, NULLABLE),
172 Field::new("topic3", DataType::BinaryView, NULLABLE),
173 ])
174 .into()
175}
176
177pub fn trace() -> SchemaRef {
178 Schema::from(vec![
179 Field::new("from", addr_dt(), NULLABLE),
180 Field::new("to", addr_dt(), NULLABLE),
181 Field::new("call_type", DataType::Utf8View, NULLABLE),
182 Field::new("gas", quantity_dt(), NULLABLE),
183 Field::new("input", DataType::BinaryView, NULLABLE),
184 Field::new("init", DataType::BinaryView, NULLABLE),
185 Field::new("value", quantity_dt(), NULLABLE),
186 Field::new("author", addr_dt(), NULLABLE),
187 Field::new("reward_type", DataType::Utf8View, NULLABLE),
188 Field::new("block_hash", DataType::BinaryView, !NULLABLE),
189 Field::new("block_number", DataType::UInt64, !NULLABLE),
190 Field::new("address", addr_dt(), NULLABLE),
191 Field::new("code", DataType::BinaryView, NULLABLE),
192 Field::new("gas_used", quantity_dt(), NULLABLE),
193 Field::new("output", DataType::BinaryView, NULLABLE),
194 Field::new("subtraces", DataType::UInt64, NULLABLE),
195 Field::new("trace_address", DataType::BinaryView, NULLABLE),
196 Field::new("transaction_hash", DataType::BinaryView, NULLABLE),
197 Field::new("transaction_position", DataType::UInt64, NULLABLE),
198 Field::new("type", DataType::Utf8View, NULLABLE),
199 Field::new("error", DataType::Utf8View, NULLABLE),
200 Field::new("sighash", DataType::BinaryView, NULLABLE),
201 Field::new("action_address", addr_dt(), NULLABLE),
202 Field::new("balance", quantity_dt(), NULLABLE),
203 Field::new("refund_address", addr_dt(), NULLABLE),
204 ])
205 .into()
206}
207
208pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
209 if chunks.is_empty() {
210 return Err(anyhow!("can't concat 0 chunks"));
211 }
212
213 let num_cols = chunks[0].columns().len();
214
215 let cols = (0..num_cols)
216 .map(|col| {
217 let mut is_utf8 = false;
218 let arrs = chunks
219 .iter()
220 .map(|chunk| {
221 let col = chunk
222 .columns()
223 .get(col)
224 .map(|col| col.as_ref())
225 .context("get column")?;
226 is_utf8 = col.data_type() == &DataType::Utf8;
227 Ok(col)
228 })
229 .collect::<Result<Vec<_>>>()?;
230 if !is_utf8 {
231 compute::concatenate::concatenate(&arrs).context("concat arrays")
232 } else {
233 let arrs = arrs
234 .into_iter()
235 .map(|a| {
236 a.as_any()
237 .downcast_ref::<Utf8Array<i32>>()
238 .unwrap()
239 .to_binary()
240 .boxed()
241 })
242 .collect::<Vec<_>>();
243 let arrs = arrs.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
244 let arr =
245 compute::concatenate::concatenate(arrs.as_slice()).context("concat arrays")?;
246
247 Ok(compute::cast::binary_to_utf8(
248 arr.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
249 DataType::Utf8,
250 )
251 .unwrap()
252 .boxed())
253 }
254 })
255 .collect::<Result<Vec<_>>>()?;
256
257 Ok(ArrowChunk::new(cols))
258}
259
260pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
261 let mut cols = Vec::new();
262 for field in schema.fields.iter() {
263 cols.push(new_empty_array(field.data_type().clone()));
264 }
265 ArrowChunk::new(cols)
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn smoke_test_schema_constructors() {
274 block_header();
275 transaction();
276 log();
277 trace();
278 }
279
280 #[test]
281 fn test_concat_utf8() {
282 let chunks = [
283 Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
284 "hello".to_owned(),
285 )])
286 .boxed()])),
287 Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
288 "world".to_owned(),
289 )])
290 .boxed()])),
291 ];
292
293 let out = concat_chunks(&chunks).unwrap();
294
295 assert_eq!(
296 out,
297 ArrowChunk::new(vec![Utf8Array::<i32>::from(&[
298 Some("hello".to_owned()),
299 Some("world".to_owned())
300 ])
301 .boxed(),])
302 )
303 }
304}