1use std::sync::Arc;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{new_empty_array, Array};
5use polars_arrow::compute;
6use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
7use polars_arrow::record_batch::RecordBatch as Chunk;
8
9mod util;
10
11pub use util::project_schema;
12
13pub type ArrowChunk = Chunk<Box<dyn Array>>;
14
15fn hash_dt() -> DataType {
16 DataType::BinaryView
17}
18
19fn addr_dt() -> DataType {
20 DataType::BinaryView
21}
22
23fn quantity_dt() -> DataType {
24 DataType::BinaryView
25}
26
27pub fn block_header() -> SchemaRef {
28 Schema::from(vec![
29 Field::new("number", DataType::UInt64, false),
30 Field::new("hash", hash_dt(), false),
31 Field::new("parent_hash", hash_dt(), false),
32 Field::new("nonce", DataType::BinaryView, true),
33 Field::new("sha3_uncles", hash_dt(), false),
34 Field::new("logs_bloom", DataType::BinaryView, false),
35 Field::new("transactions_root", hash_dt(), false),
36 Field::new("state_root", hash_dt(), false),
37 Field::new("receipts_root", hash_dt(), false),
38 Field::new("miner", addr_dt(), false),
39 Field::new("difficulty", quantity_dt(), true),
40 Field::new("total_difficulty", quantity_dt(), true),
41 Field::new("extra_data", DataType::BinaryView, false),
42 Field::new("size", quantity_dt(), false),
43 Field::new("gas_limit", quantity_dt(), false),
44 Field::new("gas_used", quantity_dt(), false),
45 Field::new("timestamp", quantity_dt(), false),
46 Field::new("uncles", DataType::BinaryView, true),
47 Field::new("base_fee_per_gas", quantity_dt(), true),
48 Field::new("blob_gas_used", quantity_dt(), true),
49 Field::new("excess_blob_gas", quantity_dt(), true),
50 Field::new("parent_beacon_block_root", hash_dt(), true),
51 Field::new("withdrawals_root", hash_dt(), true),
52 Field::new("withdrawals", DataType::BinaryView, true),
53 Field::new("l1_block_number", DataType::UInt64, true),
54 Field::new("send_count", quantity_dt(), true),
55 Field::new("send_root", hash_dt(), true),
56 Field::new("mix_hash", hash_dt(), true),
57 ])
58 .into()
59}
60
61pub fn transaction() -> SchemaRef {
62 Schema::from(vec![
63 Field::new("block_hash", hash_dt(), false),
64 Field::new("block_number", DataType::UInt64, false),
65 Field::new("from", addr_dt(), true),
66 Field::new("gas", quantity_dt(), false),
67 Field::new("gas_price", quantity_dt(), true),
68 Field::new("hash", hash_dt(), false),
69 Field::new("input", DataType::BinaryView, false),
70 Field::new("nonce", quantity_dt(), false),
71 Field::new("to", addr_dt(), true),
72 Field::new("transaction_index", DataType::UInt64, false),
73 Field::new("value", quantity_dt(), false),
74 Field::new("v", quantity_dt(), true),
75 Field::new("r", quantity_dt(), true),
76 Field::new("s", quantity_dt(), true),
77 Field::new("max_priority_fee_per_gas", quantity_dt(), true),
78 Field::new("max_fee_per_gas", quantity_dt(), true),
79 Field::new("chain_id", quantity_dt(), true),
80 Field::new("cumulative_gas_used", quantity_dt(), false),
81 Field::new("effective_gas_price", quantity_dt(), false),
82 Field::new("gas_used", quantity_dt(), false),
83 Field::new("contract_address", addr_dt(), true),
84 Field::new("logs_bloom", DataType::BinaryView, false),
85 Field::new("type", DataType::UInt8, true),
86 Field::new("root", hash_dt(), true),
87 Field::new("status", DataType::UInt8, true),
88 Field::new("sighash", DataType::BinaryView, true),
89 Field::new("tx_y_parity", quantity_dt(), true),
90 Field::new("tx_access_list", DataType::BinaryView, true),
91 Field::new("tx_l1_fee", quantity_dt(), true),
92 Field::new("tx_l1_gas_price", quantity_dt(), true),
93 Field::new("tx_l1_gas_used", quantity_dt(), true),
94 Field::new("tx_l1_fee_scalar", quantity_dt(), true),
95 Field::new("tx_gas_used_for_l1", quantity_dt(), true),
96 Field::new("max_fee_per_blob_gas", quantity_dt(), true),
97 Field::new("blob_versioned_hashes", DataType::BinaryView, true),
98 ])
99 .into()
100}
101
102pub fn log() -> SchemaRef {
103 Schema::from(vec![
104 Field::new("removed", DataType::Boolean, true),
105 Field::new("log_index", DataType::UInt64, false),
106 Field::new("transaction_index", DataType::UInt64, false),
107 Field::new("transaction_hash", hash_dt(), false),
108 Field::new("block_hash", hash_dt(), false),
109 Field::new("block_number", DataType::UInt64, false),
110 Field::new("address", addr_dt(), false),
111 Field::new("data", DataType::BinaryView, false),
112 Field::new("topic0", DataType::BinaryView, true),
113 Field::new("topic1", DataType::BinaryView, true),
114 Field::new("topic2", DataType::BinaryView, true),
115 Field::new("topic3", DataType::BinaryView, true),
116 ])
117 .into()
118}
119
120pub fn trace() -> SchemaRef {
121 Schema::from(vec![
122 Field::new("from", addr_dt(), true),
123 Field::new("to", addr_dt(), true),
124 Field::new("call_type", DataType::Utf8View, true),
125 Field::new("gas", quantity_dt(), true),
126 Field::new("input", DataType::BinaryView, true),
127 Field::new("init", DataType::BinaryView, true),
128 Field::new("value", quantity_dt(), true),
129 Field::new("author", addr_dt(), true),
130 Field::new("reward_type", DataType::Utf8View, true),
131 Field::new("block_hash", DataType::BinaryView, false),
132 Field::new("block_number", DataType::UInt64, false),
133 Field::new("address", addr_dt(), true),
134 Field::new("code", DataType::BinaryView, true),
135 Field::new("gas_used", quantity_dt(), true),
136 Field::new("output", DataType::BinaryView, true),
137 Field::new("subtraces", DataType::UInt64, true),
138 Field::new("trace_address", DataType::BinaryView, true),
139 Field::new("transaction_hash", DataType::BinaryView, true),
140 Field::new("transaction_position", DataType::UInt64, true),
141 Field::new("type", DataType::Utf8View, true),
142 Field::new("error", DataType::Utf8View, true),
143 Field::new("sighash", DataType::BinaryView, true),
144 ])
145 .into()
146}
147
148pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
149 if chunks.is_empty() {
150 return Err(anyhow!("can't concat 0 chunks"));
151 }
152
153 let num_cols = chunks[0].columns().len();
154
155 let cols = (0..num_cols)
156 .map(|col| {
157 let arrs = chunks
158 .iter()
159 .map(|chunk| {
160 chunk
161 .columns()
162 .get(col)
163 .map(|col| col.as_ref())
164 .context("get column")
165 })
166 .collect::<Result<Vec<_>>>()?;
167 compute::concatenate::concatenate(&arrs).context("concat arrays")
168 })
169 .collect::<Result<Vec<_>>>()?;
170
171 Ok(ArrowChunk::new(cols))
172}
173
174pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
175 let mut cols = Vec::new();
176 for field in schema.fields.iter() {
177 cols.push(new_empty_array(field.data_type().clone()));
178 }
179 ArrowChunk::new(cols)
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn smoke_test_schema_constructors() {
188 block_header();
189 transaction();
190 log();
191 }
192}