hypersync_schema/
lib.rs

1use std::sync::Arc;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{new_empty_array, Array, BinaryArray, Utf8Array};
5use polars_arrow::compute;
6use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
7use polars_arrow::record_batch::RecordBatchT as Chunk;
8
9mod util;
10
11pub use util::project_schema;
12
13pub type ArrowChunk = Chunk<Box<dyn Array>>;
14
15fn hash_dt() -> DataType {
16    DataType::BinaryView
17}
18
19fn addr_dt() -> DataType {
20    DataType::BinaryView
21}
22
23fn quantity_dt() -> DataType {
24    DataType::BinaryView
25}
26
27pub fn block_header() -> SchemaRef {
28    Schema::from(vec![
29        Field::new("number", DataType::UInt64, false),
30        Field::new("hash", hash_dt(), false),
31        Field::new("parent_hash", hash_dt(), false),
32        Field::new("nonce", DataType::BinaryView, true),
33        Field::new("sha3_uncles", hash_dt(), false),
34        Field::new("logs_bloom", DataType::BinaryView, false),
35        Field::new("transactions_root", hash_dt(), false),
36        Field::new("state_root", hash_dt(), false),
37        Field::new("receipts_root", hash_dt(), false),
38        Field::new("miner", addr_dt(), false),
39        Field::new("difficulty", quantity_dt(), true),
40        Field::new("total_difficulty", quantity_dt(), true),
41        Field::new("extra_data", DataType::BinaryView, false),
42        Field::new("size", quantity_dt(), false),
43        Field::new("gas_limit", quantity_dt(), false),
44        Field::new("gas_used", quantity_dt(), false),
45        Field::new("timestamp", quantity_dt(), false),
46        Field::new("uncles", DataType::BinaryView, true),
47        Field::new("base_fee_per_gas", quantity_dt(), true),
48        Field::new("blob_gas_used", quantity_dt(), true),
49        Field::new("excess_blob_gas", quantity_dt(), true),
50        Field::new("parent_beacon_block_root", hash_dt(), true),
51        Field::new("withdrawals_root", hash_dt(), true),
52        Field::new("withdrawals", DataType::BinaryView, true),
53        Field::new("l1_block_number", DataType::UInt64, true),
54        Field::new("send_count", quantity_dt(), true),
55        Field::new("send_root", hash_dt(), true),
56        Field::new("mix_hash", hash_dt(), true),
57    ])
58    .into()
59}
60
61pub fn transaction() -> SchemaRef {
62    Schema::from(vec![
63        Field::new("block_hash", hash_dt(), false),
64        Field::new("block_number", DataType::UInt64, false),
65        Field::new("from", addr_dt(), true),
66        Field::new("gas", quantity_dt(), false),
67        Field::new("gas_price", quantity_dt(), true),
68        Field::new("hash", hash_dt(), false),
69        Field::new("input", DataType::BinaryView, false),
70        Field::new("nonce", quantity_dt(), false),
71        Field::new("to", addr_dt(), true),
72        Field::new("transaction_index", DataType::UInt64, false),
73        Field::new("value", quantity_dt(), false),
74        Field::new("v", quantity_dt(), true),
75        Field::new("r", quantity_dt(), true),
76        Field::new("s", quantity_dt(), true),
77        Field::new("max_priority_fee_per_gas", quantity_dt(), true),
78        Field::new("max_fee_per_gas", quantity_dt(), true),
79        Field::new("chain_id", quantity_dt(), true),
80        Field::new("cumulative_gas_used", quantity_dt(), false),
81        Field::new("effective_gas_price", quantity_dt(), false),
82        Field::new("gas_used", quantity_dt(), false),
83        Field::new("contract_address", addr_dt(), true),
84        Field::new("logs_bloom", DataType::BinaryView, false),
85        Field::new("type", DataType::UInt8, true),
86        Field::new("root", hash_dt(), true),
87        Field::new("status", DataType::UInt8, true),
88        Field::new("sighash", DataType::BinaryView, true),
89        Field::new("y_parity", quantity_dt(), true),
90        Field::new("access_list", DataType::BinaryView, true),
91        Field::new("l1_fee", quantity_dt(), true),
92        Field::new("l1_gas_price", quantity_dt(), true),
93        Field::new("l1_gas_used", quantity_dt(), true),
94        Field::new("l1_fee_scalar", quantity_dt(), true),
95        Field::new("gas_used_for_l1", quantity_dt(), true),
96        Field::new("max_fee_per_blob_gas", quantity_dt(), true),
97        Field::new("blob_versioned_hashes", DataType::BinaryView, true),
98        Field::new("deposit_nonce", quantity_dt(), true),
99        Field::new("blob_gas_price", quantity_dt(), true),
100        Field::new("deposit_receipt_version", quantity_dt(), true),
101        Field::new("blob_gas_used", quantity_dt(), true),
102        Field::new("l1_base_fee_scalar", quantity_dt(), true),
103        Field::new("l1_blob_base_fee", quantity_dt(), true),
104        Field::new("l1_blob_base_fee_scalar", quantity_dt(), true),
105        Field::new("l1_block_number", quantity_dt(), true),
106        Field::new("mint", quantity_dt(), true),
107        Field::new("source_hash", hash_dt(), true),
108    ])
109    .into()
110}
111
112pub fn log() -> SchemaRef {
113    Schema::from(vec![
114        Field::new("removed", DataType::Boolean, true),
115        Field::new("log_index", DataType::UInt64, false),
116        Field::new("transaction_index", DataType::UInt64, false),
117        Field::new("transaction_hash", hash_dt(), false),
118        Field::new("block_hash", hash_dt(), false),
119        Field::new("block_number", DataType::UInt64, false),
120        Field::new("address", addr_dt(), false),
121        Field::new("data", DataType::BinaryView, false),
122        Field::new("topic0", DataType::BinaryView, true),
123        Field::new("topic1", DataType::BinaryView, true),
124        Field::new("topic2", DataType::BinaryView, true),
125        Field::new("topic3", DataType::BinaryView, true),
126    ])
127    .into()
128}
129
130pub fn trace() -> SchemaRef {
131    Schema::from(vec![
132        Field::new("from", addr_dt(), true),
133        Field::new("to", addr_dt(), true),
134        Field::new("call_type", DataType::Utf8View, true),
135        Field::new("gas", quantity_dt(), true),
136        Field::new("input", DataType::BinaryView, true),
137        Field::new("init", DataType::BinaryView, true),
138        Field::new("value", quantity_dt(), true),
139        Field::new("author", addr_dt(), true),
140        Field::new("reward_type", DataType::Utf8View, true),
141        Field::new("block_hash", DataType::BinaryView, false),
142        Field::new("block_number", DataType::UInt64, false),
143        Field::new("address", addr_dt(), true),
144        Field::new("code", DataType::BinaryView, true),
145        Field::new("gas_used", quantity_dt(), true),
146        Field::new("output", DataType::BinaryView, true),
147        Field::new("subtraces", DataType::UInt64, true),
148        Field::new("trace_address", DataType::BinaryView, true),
149        Field::new("transaction_hash", DataType::BinaryView, true),
150        Field::new("transaction_position", DataType::UInt64, true),
151        Field::new("type", DataType::Utf8View, true),
152        Field::new("error", DataType::Utf8View, true),
153        Field::new("sighash", DataType::BinaryView, true),
154        Field::new("action_address", addr_dt(), true),
155        Field::new("balance", quantity_dt(), true),
156        Field::new("refund_address", addr_dt(), true),
157    ])
158    .into()
159}
160
161pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
162    if chunks.is_empty() {
163        return Err(anyhow!("can't concat 0 chunks"));
164    }
165
166    let num_cols = chunks[0].columns().len();
167
168    let cols = (0..num_cols)
169        .map(|col| {
170            let mut is_utf8 = false;
171            let arrs = chunks
172                .iter()
173                .map(|chunk| {
174                    let col = chunk
175                        .columns()
176                        .get(col)
177                        .map(|col| col.as_ref())
178                        .context("get column")?;
179                    is_utf8 = col.data_type() == &DataType::Utf8;
180                    Ok(col)
181                })
182                .collect::<Result<Vec<_>>>()?;
183            if !is_utf8 {
184                compute::concatenate::concatenate(&arrs).context("concat arrays")
185            } else {
186                let arrs = arrs
187                    .into_iter()
188                    .map(|a| {
189                        a.as_any()
190                            .downcast_ref::<Utf8Array<i32>>()
191                            .unwrap()
192                            .to_binary()
193                            .boxed()
194                    })
195                    .collect::<Vec<_>>();
196                let arrs = arrs.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
197                let arr =
198                    compute::concatenate::concatenate(arrs.as_slice()).context("concat arrays")?;
199
200                Ok(compute::cast::binary_to_utf8(
201                    arr.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
202                    DataType::Utf8,
203                )
204                .unwrap()
205                .boxed())
206            }
207        })
208        .collect::<Result<Vec<_>>>()?;
209
210    Ok(ArrowChunk::new(cols))
211}
212
213pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
214    let mut cols = Vec::new();
215    for field in schema.fields.iter() {
216        cols.push(new_empty_array(field.data_type().clone()));
217    }
218    ArrowChunk::new(cols)
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn smoke_test_schema_constructors() {
227        block_header();
228        transaction();
229        log();
230        trace();
231    }
232
233    #[test]
234    fn test_concat_utf8() {
235        let chunks = [
236            Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
237                "hello".to_owned(),
238            )])
239            .boxed()])),
240            Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
241                "world".to_owned(),
242            )])
243            .boxed()])),
244        ];
245
246        let out = concat_chunks(&chunks).unwrap();
247
248        assert_eq!(
249            out,
250            ArrowChunk::new(vec![Utf8Array::<i32>::from(&[
251                Some("hello".to_owned()),
252                Some("world".to_owned())
253            ])
254            .boxed(),])
255        )
256    }
257}