hyperfuel_schema/
lib.rs

1use std::sync::Arc;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{new_empty_array, Array, BinaryArray, Utf8Array};
5use polars_arrow::compute;
6use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
7use polars_arrow::record_batch::RecordBatchT as Chunk;
8
9mod util;
10
11pub use util::project_schema;
12
13pub type ArrowChunk = Chunk<Box<dyn Array>>;
14
15pub fn block_header() -> SchemaRef {
16    Schema::from(vec![
17        Field::new("id", DataType::BinaryView, false),
18        Field::new("da_height", DataType::UInt64, false),
19        Field::new("consensus_parameters_version", DataType::UInt64, false), // new
20        Field::new("state_transition_bytecode_version", DataType::UInt64, false), // new
21        Field::new("transactions_count", DataType::UInt64, false),
22        Field::new("message_receipt_count", DataType::UInt64, false),
23        Field::new("transactions_root", DataType::BinaryView, false),
24        Field::new("message_outbox_root", DataType::BinaryView, false), // renamed
25        Field::new("event_inbox_root", DataType::BinaryView, false),    // new
26        Field::new("height", DataType::UInt64, false),
27        Field::new("prev_root", DataType::BinaryView, false),
28        Field::new("time", DataType::Int64, false),
29        Field::new("application_hash", DataType::BinaryView, false),
30    ])
31    .into()
32}
33
34pub fn transaction() -> SchemaRef {
35    Schema::from(vec![
36        // block number
37        Field::new("block_height", DataType::UInt64, false),
38        Field::new("id", DataType::BinaryView, false),
39        // vec
40        Field::new("input_asset_ids", DataType::BinaryView, true),
41        // vec
42        Field::new("input_contracts", DataType::BinaryView, true),
43        Field::new("input_contract_utxo_id", DataType::BinaryView, true),
44        Field::new("input_contract_balance_root", DataType::BinaryView, true),
45        Field::new("input_contract_state_root", DataType::BinaryView, true),
46        Field::new(
47            "input_contract_tx_pointer_block_height",
48            DataType::UInt64,
49            true,
50        ),
51        Field::new("input_contract_tx_pointer_tx_index", DataType::UInt64, true),
52        Field::new("input_contract", DataType::BinaryView, true),
53        // Field::new("gas_price", DataType::UInt64, true), // removed
54        Field::new("policies_tip", DataType::UInt64, true), // new
55        Field::new("policies_witness_limit", DataType::UInt64, true), // new
56        Field::new("policies_maturity", DataType::UInt64, true), // new
57        Field::new("policies_max_fee", DataType::UInt64, true), // new
58        Field::new("script_gas_limit", DataType::UInt64, true), // renamed
59        Field::new("maturity", DataType::UInt64, true),
60        Field::new("mint_amount", DataType::UInt64, true),
61        Field::new("mint_asset_id", DataType::BinaryView, true),
62        Field::new("mint_gas_price", DataType::UInt64, true), // new
63        Field::new("tx_pointer_block_height", DataType::UInt64, true),
64        Field::new("tx_pointer_tx_index", DataType::UInt64, true),
65        Field::new("tx_type", DataType::UInt8, false), // not changes, but new tx_types (upgrade, upload)
66        Field::new("output_contract_input_index", DataType::UInt64, true),
67        Field::new("output_contract_balance_root", DataType::BinaryView, true),
68        Field::new("output_contract_state_root", DataType::BinaryView, true),
69        // vec
70        Field::new("witnesses", DataType::BinaryView, true),
71        Field::new("receipts_root", DataType::BinaryView, true),
72        Field::new("status", DataType::UInt8, false),
73        Field::new("time", DataType::Int64, false),
74        Field::new("reason", DataType::Utf8View, true),
75        Field::new("script", DataType::BinaryView, true),
76        Field::new("script_data", DataType::BinaryView, true),
77        Field::new("bytecode_witness_index", DataType::UInt64, true),
78        // Field::new("bytecode_length", DataType::UInt64, true), // removed
79        Field::new("bytecode_root", DataType::BinaryView, true), // new
80        Field::new("subsection_index", DataType::UInt64, true),  // new
81        Field::new("subsections_number", DataType::UInt64, true), // new
82        // vec
83        // Field::new("storage_slots", DataType::Binary, true), // new
84        // vec
85        Field::new("proof_set", DataType::BinaryView, true), // new
86        Field::new(
87            "consensus_parameters_upgrade_purpose_witness_index",
88            DataType::UInt64,
89            true,
90        ), // new
91        Field::new(
92            "consensus_parameters_upgrade_purpose_checksum",
93            DataType::BinaryView,
94            true,
95        ), // new
96        Field::new(
97            "state_transition_upgrade_purpose_root",
98            DataType::BinaryView,
99            true,
100        ), // new
101        Field::new("salt", DataType::BinaryView, true),
102    ])
103    .into()
104}
105
106pub fn receipt() -> SchemaRef {
107    Schema::from(vec![
108        // receipt index is unique per block
109        Field::new("receipt_index", DataType::UInt64, false),
110        Field::new("root_contract_id", DataType::BinaryView, true),
111        Field::new("tx_id", DataType::BinaryView, false),
112        Field::new("tx_status", DataType::UInt8, false), // new
113        Field::new("tx_type", DataType::UInt8, false),   // new
114        Field::new("block_height", DataType::UInt64, false),
115        Field::new("pc", DataType::UInt64, true),
116        Field::new("is", DataType::UInt64, true),
117        Field::new("to", DataType::BinaryView, true),
118        Field::new("to_address", DataType::BinaryView, true),
119        Field::new("amount", DataType::UInt64, true),
120        Field::new("asset_id", DataType::BinaryView, true),
121        Field::new("gas", DataType::UInt64, true),
122        Field::new("param1", DataType::UInt64, true),
123        Field::new("param2", DataType::UInt64, true),
124        Field::new("val", DataType::UInt64, true),
125        Field::new("ptr", DataType::UInt64, true),
126        Field::new("digest", DataType::BinaryView, true),
127        Field::new("reason", DataType::UInt64, true),
128        Field::new("ra", DataType::UInt64, true),
129        Field::new("rb", DataType::UInt64, true),
130        Field::new("rc", DataType::UInt64, true),
131        Field::new("rd", DataType::UInt64, true),
132        Field::new("len", DataType::UInt64, true),
133        Field::new("receipt_type", DataType::UInt8, false),
134        Field::new("result", DataType::UInt64, true),
135        Field::new("gas_used", DataType::UInt64, true),
136        Field::new("data", DataType::BinaryView, true),
137        Field::new("sender", DataType::BinaryView, true),
138        Field::new("recipient", DataType::BinaryView, true),
139        Field::new("nonce", DataType::BinaryView, true),
140        Field::new("contract_id", DataType::BinaryView, true),
141        Field::new("sub_id", DataType::BinaryView, true),
142    ])
143    .into()
144}
145
146pub fn input() -> SchemaRef {
147    Schema::from(vec![
148        // for mapping
149        Field::new("tx_id", DataType::BinaryView, false),
150        Field::new("tx_status", DataType::UInt8, false), // new
151        Field::new("tx_type", DataType::UInt8, false),   // new
152        Field::new("block_height", DataType::UInt64, false),
153        Field::new("input_type", DataType::UInt8, false),
154        Field::new("utxo_id", DataType::BinaryView, true),
155        Field::new("owner", DataType::BinaryView, true),
156        Field::new("amount", DataType::UInt64, true),
157        Field::new("asset_id", DataType::BinaryView, true),
158        Field::new("tx_pointer_block_height", DataType::UInt64, true),
159        Field::new("tx_pointer_tx_index", DataType::UInt64, true),
160        Field::new("witness_index", DataType::UInt64, true),
161        Field::new("predicate_gas_used", DataType::UInt64, true),
162        Field::new("predicate", DataType::BinaryView, true),
163        Field::new("predicate_data", DataType::BinaryView, true),
164        Field::new("balance_root", DataType::BinaryView, true),
165        Field::new("state_root", DataType::BinaryView, true),
166        Field::new("contract", DataType::BinaryView, true),
167        Field::new("sender", DataType::BinaryView, true),
168        Field::new("recipient", DataType::BinaryView, true),
169        Field::new("nonce", DataType::BinaryView, true),
170        Field::new("data", DataType::BinaryView, true),
171    ])
172    .into()
173}
174
175pub fn output() -> SchemaRef {
176    Schema::from(vec![
177        // for mapping
178        Field::new("tx_id", DataType::BinaryView, false),
179        Field::new("tx_status", DataType::UInt8, false), // new
180        Field::new("tx_type", DataType::UInt8, false),   // new
181        Field::new("block_height", DataType::UInt64, false),
182        Field::new("output_type", DataType::UInt8, false),
183        Field::new("to", DataType::BinaryView, true),
184        Field::new("amount", DataType::UInt64, true),
185        Field::new("asset_id", DataType::BinaryView, true),
186        Field::new("input_index", DataType::UInt64, true),
187        Field::new("balance_root", DataType::BinaryView, true),
188        Field::new("state_root", DataType::BinaryView, true),
189        Field::new("contract", DataType::BinaryView, true),
190    ])
191    .into()
192}
193
194pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
195    if chunks.is_empty() {
196        return Err(anyhow!("can't concat 0 chunks"));
197    }
198
199    let num_cols = chunks[0].columns().len();
200
201    let cols = (0..num_cols)
202        .map(|col| {
203            let mut is_utf8 = false;
204            let arrs = chunks
205                .iter()
206                .map(|chunk| {
207                    let col = chunk
208                        .columns()
209                        .get(col)
210                        .map(|col| col.as_ref())
211                        .context("get column")?;
212                    is_utf8 = col.data_type() == &DataType::Utf8;
213                    Ok(col)
214                })
215                .collect::<Result<Vec<_>>>()?;
216            if !is_utf8 {
217                compute::concatenate::concatenate(&arrs).context("concat arrays")
218            } else {
219                let arrs = arrs
220                    .into_iter()
221                    .map(|a| {
222                        a.as_any()
223                            .downcast_ref::<Utf8Array<i32>>()
224                            .unwrap()
225                            .to_binary()
226                            .boxed()
227                    })
228                    .collect::<Vec<_>>();
229                let arrs = arrs.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
230                let arr =
231                    compute::concatenate::concatenate(arrs.as_slice()).context("concat arrays")?;
232
233                Ok(compute::cast::binary_to_utf8(
234                    arr.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
235                    DataType::Utf8,
236                )
237                .unwrap()
238                .boxed())
239            }
240        })
241        .collect::<Result<Vec<_>>>()?;
242
243    Ok(ArrowChunk::new(cols))
244}
245
246pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
247    let mut cols = Vec::new();
248    for field in schema.fields.iter() {
249        cols.push(new_empty_array(field.data_type().clone()));
250    }
251    ArrowChunk::new(cols)
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn smoke_test_schema_constructors() {
260        block_header();
261        transaction();
262        receipt();
263        input();
264        output();
265    }
266
267    #[test]
268    fn test_concat_utf8() {
269        let chunks = [
270            Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
271                "hello".to_owned(),
272            )])
273            .boxed()])),
274            Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
275                "world".to_owned(),
276            )])
277            .boxed()])),
278        ];
279
280        let out = concat_chunks(&chunks).unwrap();
281
282        assert_eq!(
283            out,
284            ArrowChunk::new(vec![Utf8Array::<i32>::from(&[
285                Some("hello".to_owned()),
286                Some("world".to_owned())
287            ])
288            .boxed(),])
289        )
290    }
291}