skar_schema/
lib.rs

1use std::sync::Arc;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{new_empty_array, Array};
5use polars_arrow::compute;
6use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
7use polars_arrow::record_batch::RecordBatch as Chunk;
8
9mod util;
10
11pub use util::project_schema;
12
13pub type ArrowChunk = Chunk<Box<dyn Array>>;
14
15fn hash_dt() -> DataType {
16    DataType::BinaryView
17}
18
19fn addr_dt() -> DataType {
20    DataType::BinaryView
21}
22
23fn quantity_dt() -> DataType {
24    DataType::BinaryView
25}
26
27pub fn block_header() -> SchemaRef {
28    Schema::from(vec![
29        Field::new("number", DataType::UInt64, false),
30        Field::new("hash", hash_dt(), false),
31        Field::new("parent_hash", hash_dt(), false),
32        Field::new("nonce", DataType::BinaryView, true),
33        Field::new("sha3_uncles", hash_dt(), false),
34        Field::new("logs_bloom", DataType::BinaryView, false),
35        Field::new("transactions_root", hash_dt(), false),
36        Field::new("state_root", hash_dt(), false),
37        Field::new("receipts_root", hash_dt(), false),
38        Field::new("miner", addr_dt(), false),
39        Field::new("difficulty", quantity_dt(), true),
40        Field::new("total_difficulty", quantity_dt(), true),
41        Field::new("extra_data", DataType::BinaryView, false),
42        Field::new("size", quantity_dt(), false),
43        Field::new("gas_limit", quantity_dt(), false),
44        Field::new("gas_used", quantity_dt(), false),
45        Field::new("timestamp", quantity_dt(), false),
46        Field::new("uncles", DataType::BinaryView, true),
47        Field::new("base_fee_per_gas", quantity_dt(), true),
48        Field::new("blob_gas_used", quantity_dt(), true),
49        Field::new("excess_blob_gas", quantity_dt(), true),
50        Field::new("parent_beacon_block_root", hash_dt(), true),
51        Field::new("withdrawals_root", hash_dt(), true),
52        Field::new("withdrawals", DataType::BinaryView, true),
53        Field::new("l1_block_number", DataType::UInt64, true),
54        Field::new("send_count", quantity_dt(), true),
55        Field::new("send_root", hash_dt(), true),
56        Field::new("mix_hash", hash_dt(), true),
57    ])
58    .into()
59}
60
61pub fn transaction() -> SchemaRef {
62    Schema::from(vec![
63        Field::new("block_hash", hash_dt(), false),
64        Field::new("block_number", DataType::UInt64, false),
65        Field::new("from", addr_dt(), true),
66        Field::new("gas", quantity_dt(), false),
67        Field::new("gas_price", quantity_dt(), true),
68        Field::new("hash", hash_dt(), false),
69        Field::new("input", DataType::BinaryView, false),
70        Field::new("nonce", quantity_dt(), false),
71        Field::new("to", addr_dt(), true),
72        Field::new("transaction_index", DataType::UInt64, false),
73        Field::new("value", quantity_dt(), false),
74        Field::new("v", quantity_dt(), true),
75        Field::new("r", quantity_dt(), true),
76        Field::new("s", quantity_dt(), true),
77        Field::new("max_priority_fee_per_gas", quantity_dt(), true),
78        Field::new("max_fee_per_gas", quantity_dt(), true),
79        Field::new("chain_id", quantity_dt(), true),
80        Field::new("cumulative_gas_used", quantity_dt(), false),
81        Field::new("effective_gas_price", quantity_dt(), false),
82        Field::new("gas_used", quantity_dt(), false),
83        Field::new("contract_address", addr_dt(), true),
84        Field::new("logs_bloom", DataType::BinaryView, false),
85        Field::new("type", DataType::UInt8, true),
86        Field::new("root", hash_dt(), true),
87        Field::new("status", DataType::UInt8, true),
88        Field::new("sighash", DataType::BinaryView, true),
89        Field::new("tx_y_parity", quantity_dt(), true),
90        Field::new("tx_access_list", DataType::BinaryView, true),
91        Field::new("tx_l1_fee", quantity_dt(), true),
92        Field::new("tx_l1_gas_price", quantity_dt(), true),
93        Field::new("tx_l1_gas_used", quantity_dt(), true),
94        Field::new("tx_l1_fee_scalar", quantity_dt(), true),
95        Field::new("tx_gas_used_for_l1", quantity_dt(), true),
96        Field::new("max_fee_per_blob_gas", quantity_dt(), true),
97        Field::new("blob_versioned_hashes", DataType::BinaryView, true),
98    ])
99    .into()
100}
101
102pub fn log() -> SchemaRef {
103    Schema::from(vec![
104        Field::new("removed", DataType::Boolean, true),
105        Field::new("log_index", DataType::UInt64, false),
106        Field::new("transaction_index", DataType::UInt64, false),
107        Field::new("transaction_hash", hash_dt(), false),
108        Field::new("block_hash", hash_dt(), false),
109        Field::new("block_number", DataType::UInt64, false),
110        Field::new("address", addr_dt(), false),
111        Field::new("data", DataType::BinaryView, false),
112        Field::new("topic0", DataType::BinaryView, true),
113        Field::new("topic1", DataType::BinaryView, true),
114        Field::new("topic2", DataType::BinaryView, true),
115        Field::new("topic3", DataType::BinaryView, true),
116    ])
117    .into()
118}
119
120pub fn trace() -> SchemaRef {
121    Schema::from(vec![
122        Field::new("from", addr_dt(), true),
123        Field::new("to", addr_dt(), true),
124        Field::new("call_type", DataType::Utf8View, true),
125        Field::new("gas", quantity_dt(), true),
126        Field::new("input", DataType::BinaryView, true),
127        Field::new("init", DataType::BinaryView, true),
128        Field::new("value", quantity_dt(), true),
129        Field::new("author", addr_dt(), true),
130        Field::new("reward_type", DataType::Utf8View, true),
131        Field::new("block_hash", DataType::BinaryView, false),
132        Field::new("block_number", DataType::UInt64, false),
133        Field::new("address", addr_dt(), true),
134        Field::new("code", DataType::BinaryView, true),
135        Field::new("gas_used", quantity_dt(), true),
136        Field::new("output", DataType::BinaryView, true),
137        Field::new("subtraces", DataType::UInt64, true),
138        Field::new("trace_address", DataType::BinaryView, true),
139        Field::new("transaction_hash", DataType::BinaryView, true),
140        Field::new("transaction_position", DataType::UInt64, true),
141        Field::new("type", DataType::Utf8View, true),
142        Field::new("error", DataType::Utf8View, true),
143        Field::new("sighash", DataType::BinaryView, true),
144    ])
145    .into()
146}
147
148pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
149    if chunks.is_empty() {
150        return Err(anyhow!("can't concat 0 chunks"));
151    }
152
153    let num_cols = chunks[0].columns().len();
154
155    let cols = (0..num_cols)
156        .map(|col| {
157            let arrs = chunks
158                .iter()
159                .map(|chunk| {
160                    chunk
161                        .columns()
162                        .get(col)
163                        .map(|col| col.as_ref())
164                        .context("get column")
165                })
166                .collect::<Result<Vec<_>>>()?;
167            compute::concatenate::concatenate(&arrs).context("concat arrays")
168        })
169        .collect::<Result<Vec<_>>>()?;
170
171    Ok(ArrowChunk::new(cols))
172}
173
174pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
175    let mut cols = Vec::new();
176    for field in schema.fields.iter() {
177        cols.push(new_empty_array(field.data_type().clone()));
178    }
179    ArrowChunk::new(cols)
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn smoke_test_schema_constructors() {
188        block_header();
189        transaction();
190        log();
191    }
192}