hypersync_schema/
lib.rs

1//! # HyperSync Schema
2//!
3//! Apache Arrow schemas and data transformation utilities for the HyperSync protocol.
4//!
5//! This crate provides the Arrow schema definitions and data transformation
6//! utilities used by HyperSync for high-performance columnar data processing.
7//! It bridges the gap between HyperSync's native data formats and Apache Arrow.
8//!
9//! ## Features
10//!
11//! - **Arrow schemas**: Predefined schemas for blocks, transactions, logs, and traces
12//! - **Data transformation**: Utilities for converting between formats
13//! - **High performance**: Optimized columnar data operations
14//! - **Schema projection**: Select only needed columns for memory efficiency
15//!
16//! ## Key Functions
17//!
18//! - [`block_schema()`] - Get Arrow schema for block data
19//! - [`transaction_schema()`] - Get Arrow schema for transaction data  
20//! - [`log_schema()`] - Get Arrow schema for log/event data
21//! - [`trace_schema()`] - Get Arrow schema for trace data
22//! - [`project_schema()`] - Project schema to subset of columns
23//! - [`concat_chunks()`] - Efficiently concatenate Arrow chunks
24//!
25//! ## Example
26//!
27//! ```
28//! use hypersync_schema::{transaction, log, project_schema};
29//! use std::collections::BTreeSet;
30//!
31//! // Get schema for transaction data
32//! let tx_schema = transaction();
33//! println!("Transaction schema has {} fields", tx_schema.fields.len());
34//!
35//! // Get schema for log data  
36//! let log_schema = log();
37//! println!("Log schema has {} fields", log_schema.fields.len());
38//!
39//! // Project to subset of fields
40//! let fields: BTreeSet<String> = ["hash", "from"].iter().map(|s| s.to_string()).collect();
41//! let projected = project_schema(&tx_schema, &fields);
42//! println!("Projected schema has {} fields", projected.fields.len());
43//! ```
44
45use std::sync::Arc;
46
47use anyhow::{anyhow, Context, Result};
48use polars_arrow::array::{new_empty_array, Array, BinaryArray, Utf8Array};
49use polars_arrow::compute;
50use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
51use polars_arrow::record_batch::RecordBatchT as Chunk;
52
53mod util;
54
55pub use util::project_schema;
56
57pub type ArrowChunk = Chunk<Box<dyn Array>>;
58
59fn hash_dt() -> DataType {
60    DataType::BinaryView
61}
62
63fn addr_dt() -> DataType {
64    DataType::BinaryView
65}
66
67fn quantity_dt() -> DataType {
68    DataType::BinaryView
69}
70
71const NULLABLE: bool = true;
72
73pub fn block_header() -> SchemaRef {
74    Schema::from(vec![
75        Field::new("number", DataType::UInt64, !NULLABLE),
76        Field::new("hash", hash_dt(), !NULLABLE),
77        Field::new("parent_hash", hash_dt(), !NULLABLE),
78        Field::new("nonce", DataType::BinaryView, NULLABLE),
79        Field::new("sha3_uncles", hash_dt(), !NULLABLE),
80        Field::new("logs_bloom", DataType::BinaryView, !NULLABLE),
81        Field::new("transactions_root", hash_dt(), !NULLABLE),
82        Field::new("state_root", hash_dt(), !NULLABLE),
83        Field::new("receipts_root", hash_dt(), !NULLABLE),
84        Field::new("miner", addr_dt(), !NULLABLE),
85        Field::new("difficulty", quantity_dt(), NULLABLE),
86        Field::new("total_difficulty", quantity_dt(), NULLABLE),
87        Field::new("extra_data", DataType::BinaryView, !NULLABLE),
88        Field::new("size", quantity_dt(), !NULLABLE),
89        Field::new("gas_limit", quantity_dt(), !NULLABLE),
90        Field::new("gas_used", quantity_dt(), !NULLABLE),
91        Field::new("timestamp", quantity_dt(), !NULLABLE),
92        Field::new("uncles", DataType::BinaryView, NULLABLE),
93        Field::new("base_fee_per_gas", quantity_dt(), NULLABLE),
94        Field::new("blob_gas_used", quantity_dt(), NULLABLE),
95        Field::new("excess_blob_gas", quantity_dt(), NULLABLE),
96        Field::new("parent_beacon_block_root", hash_dt(), NULLABLE),
97        Field::new("withdrawals_root", hash_dt(), NULLABLE),
98        Field::new("withdrawals", DataType::BinaryView, NULLABLE),
99        Field::new("l1_block_number", DataType::UInt64, NULLABLE),
100        Field::new("send_count", quantity_dt(), NULLABLE),
101        Field::new("send_root", hash_dt(), NULLABLE),
102        Field::new("mix_hash", hash_dt(), NULLABLE),
103    ])
104    .into()
105}
106
107pub fn transaction() -> SchemaRef {
108    Schema::from(vec![
109        Field::new("block_hash", hash_dt(), !NULLABLE),
110        Field::new("block_number", DataType::UInt64, !NULLABLE),
111        Field::new("from", addr_dt(), NULLABLE),
112        Field::new("gas", quantity_dt(), !NULLABLE),
113        Field::new("gas_price", quantity_dt(), NULLABLE),
114        Field::new("hash", hash_dt(), !NULLABLE),
115        Field::new("input", DataType::BinaryView, !NULLABLE),
116        Field::new("nonce", quantity_dt(), !NULLABLE),
117        Field::new("to", addr_dt(), NULLABLE),
118        Field::new("transaction_index", DataType::UInt64, !NULLABLE),
119        Field::new("value", quantity_dt(), !NULLABLE),
120        Field::new("v", quantity_dt(), NULLABLE),
121        Field::new("r", quantity_dt(), NULLABLE),
122        Field::new("s", quantity_dt(), NULLABLE),
123        Field::new("max_priority_fee_per_gas", quantity_dt(), NULLABLE),
124        Field::new("max_fee_per_gas", quantity_dt(), NULLABLE),
125        Field::new("chain_id", quantity_dt(), NULLABLE),
126        Field::new("cumulative_gas_used", quantity_dt(), !NULLABLE),
127        Field::new("effective_gas_price", quantity_dt(), !NULLABLE),
128        Field::new("gas_used", quantity_dt(), !NULLABLE),
129        Field::new("contract_address", addr_dt(), NULLABLE),
130        Field::new("logs_bloom", DataType::BinaryView, !NULLABLE),
131        Field::new("type", DataType::UInt8, NULLABLE),
132        Field::new("root", hash_dt(), NULLABLE),
133        Field::new("status", DataType::UInt8, NULLABLE),
134        Field::new("sighash", DataType::BinaryView, NULLABLE),
135        Field::new("y_parity", quantity_dt(), NULLABLE),
136        Field::new("access_list", DataType::BinaryView, NULLABLE),
137        Field::new("authorization_list", DataType::BinaryView, NULLABLE),
138        Field::new("l1_fee", quantity_dt(), NULLABLE),
139        Field::new("l1_gas_price", quantity_dt(), NULLABLE),
140        Field::new("l1_gas_used", quantity_dt(), NULLABLE),
141        Field::new("l1_fee_scalar", quantity_dt(), NULLABLE),
142        Field::new("gas_used_for_l1", quantity_dt(), NULLABLE),
143        Field::new("max_fee_per_blob_gas", quantity_dt(), NULLABLE),
144        Field::new("blob_versioned_hashes", DataType::BinaryView, NULLABLE),
145        Field::new("deposit_nonce", quantity_dt(), NULLABLE),
146        Field::new("blob_gas_price", quantity_dt(), NULLABLE),
147        Field::new("deposit_receipt_version", quantity_dt(), NULLABLE),
148        Field::new("blob_gas_used", quantity_dt(), NULLABLE),
149        Field::new("l1_base_fee_scalar", quantity_dt(), NULLABLE),
150        Field::new("l1_blob_base_fee", quantity_dt(), NULLABLE),
151        Field::new("l1_blob_base_fee_scalar", quantity_dt(), NULLABLE),
152        Field::new("l1_block_number", quantity_dt(), NULLABLE),
153        Field::new("mint", quantity_dt(), NULLABLE),
154        Field::new("source_hash", hash_dt(), NULLABLE),
155    ])
156    .into()
157}
158
159pub fn log() -> SchemaRef {
160    Schema::from(vec![
161        Field::new("removed", DataType::Boolean, NULLABLE),
162        Field::new("log_index", DataType::UInt64, !NULLABLE),
163        Field::new("transaction_index", DataType::UInt64, !NULLABLE),
164        Field::new("transaction_hash", hash_dt(), !NULLABLE),
165        Field::new("block_hash", hash_dt(), !NULLABLE),
166        Field::new("block_number", DataType::UInt64, !NULLABLE),
167        Field::new("address", addr_dt(), !NULLABLE),
168        Field::new("data", DataType::BinaryView, !NULLABLE),
169        Field::new("topic0", DataType::BinaryView, NULLABLE),
170        Field::new("topic1", DataType::BinaryView, NULLABLE),
171        Field::new("topic2", DataType::BinaryView, NULLABLE),
172        Field::new("topic3", DataType::BinaryView, NULLABLE),
173    ])
174    .into()
175}
176
177pub fn trace() -> SchemaRef {
178    Schema::from(vec![
179        Field::new("from", addr_dt(), NULLABLE),
180        Field::new("to", addr_dt(), NULLABLE),
181        Field::new("call_type", DataType::Utf8View, NULLABLE),
182        Field::new("gas", quantity_dt(), NULLABLE),
183        Field::new("input", DataType::BinaryView, NULLABLE),
184        Field::new("init", DataType::BinaryView, NULLABLE),
185        Field::new("value", quantity_dt(), NULLABLE),
186        Field::new("author", addr_dt(), NULLABLE),
187        Field::new("reward_type", DataType::Utf8View, NULLABLE),
188        Field::new("block_hash", DataType::BinaryView, !NULLABLE),
189        Field::new("block_number", DataType::UInt64, !NULLABLE),
190        Field::new("address", addr_dt(), NULLABLE),
191        Field::new("code", DataType::BinaryView, NULLABLE),
192        Field::new("gas_used", quantity_dt(), NULLABLE),
193        Field::new("output", DataType::BinaryView, NULLABLE),
194        Field::new("subtraces", DataType::UInt64, NULLABLE),
195        Field::new("trace_address", DataType::BinaryView, NULLABLE),
196        Field::new("transaction_hash", DataType::BinaryView, NULLABLE),
197        Field::new("transaction_position", DataType::UInt64, NULLABLE),
198        Field::new("type", DataType::Utf8View, NULLABLE),
199        Field::new("error", DataType::Utf8View, NULLABLE),
200        Field::new("sighash", DataType::BinaryView, NULLABLE),
201        Field::new("action_address", addr_dt(), NULLABLE),
202        Field::new("balance", quantity_dt(), NULLABLE),
203        Field::new("refund_address", addr_dt(), NULLABLE),
204    ])
205    .into()
206}
207
208pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
209    if chunks.is_empty() {
210        return Err(anyhow!("can't concat 0 chunks"));
211    }
212
213    let num_cols = chunks[0].columns().len();
214
215    let cols = (0..num_cols)
216        .map(|col| {
217            let mut is_utf8 = false;
218            let arrs = chunks
219                .iter()
220                .map(|chunk| {
221                    let col = chunk
222                        .columns()
223                        .get(col)
224                        .map(|col| col.as_ref())
225                        .context("get column")?;
226                    is_utf8 = col.data_type() == &DataType::Utf8;
227                    Ok(col)
228                })
229                .collect::<Result<Vec<_>>>()?;
230            if !is_utf8 {
231                compute::concatenate::concatenate(&arrs).context("concat arrays")
232            } else {
233                let arrs = arrs
234                    .into_iter()
235                    .map(|a| {
236                        a.as_any()
237                            .downcast_ref::<Utf8Array<i32>>()
238                            .unwrap()
239                            .to_binary()
240                            .boxed()
241                    })
242                    .collect::<Vec<_>>();
243                let arrs = arrs.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
244                let arr =
245                    compute::concatenate::concatenate(arrs.as_slice()).context("concat arrays")?;
246
247                Ok(compute::cast::binary_to_utf8(
248                    arr.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
249                    DataType::Utf8,
250                )
251                .unwrap()
252                .boxed())
253            }
254        })
255        .collect::<Result<Vec<_>>>()?;
256
257    Ok(ArrowChunk::new(cols))
258}
259
260pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
261    let mut cols = Vec::new();
262    for field in schema.fields.iter() {
263        cols.push(new_empty_array(field.data_type().clone()));
264    }
265    ArrowChunk::new(cols)
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn smoke_test_schema_constructors() {
274        block_header();
275        transaction();
276        log();
277        trace();
278    }
279
280    #[test]
281    fn test_concat_utf8() {
282        let chunks = [
283            Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
284                "hello".to_owned(),
285            )])
286            .boxed()])),
287            Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
288                "world".to_owned(),
289            )])
290            .boxed()])),
291        ];
292
293        let out = concat_chunks(&chunks).unwrap();
294
295        assert_eq!(
296            out,
297            ArrowChunk::new(vec![Utf8Array::<i32>::from(&[
298                Some("hello".to_owned()),
299                Some("world".to_owned())
300            ])
301            .boxed(),])
302        )
303    }
304}