use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow2::array::{new_empty_array, Array};
use arrow2::chunk::Chunk;
use arrow2::compute;
use arrow2::datatypes::{DataType, Field, Schema, SchemaRef};
mod util;
pub use util::project_schema;
pub type ArrowChunk = Chunk<Box<dyn Array>>;
pub fn block_header() -> SchemaRef {
Schema::from(vec![
Field::new("id", DataType::Binary, false),
Field::new("da_height", DataType::UInt64, false),
Field::new("consensus_parameters_version", DataType::UInt64, false), Field::new("state_transition_bytecode_version", DataType::UInt64, false), Field::new("transactions_count", DataType::UInt64, false),
Field::new("message_receipt_count", DataType::UInt64, false),
Field::new("transactions_root", DataType::Binary, false),
Field::new("message_outbox_root", DataType::Binary, false), Field::new("event_inbox_root", DataType::Binary, false), Field::new("height", DataType::UInt64, false),
Field::new("prev_root", DataType::Binary, false),
Field::new("time", DataType::Int64, false),
Field::new("application_hash", DataType::Binary, false),
])
.into()
}
pub fn transaction() -> SchemaRef {
Schema::from(vec![
Field::new("block_height", DataType::UInt64, false),
Field::new("id", DataType::Binary, false),
Field::new("input_asset_ids", DataType::Binary, true),
Field::new("input_contracts", DataType::Binary, true),
Field::new("input_contract_utxo_id", DataType::Binary, true),
Field::new("input_contract_balance_root", DataType::Binary, true),
Field::new("input_contract_state_root", DataType::Binary, true),
Field::new(
"input_contract_tx_pointer_block_height",
DataType::UInt64,
true,
),
Field::new("input_contract_tx_pointer_tx_index", DataType::UInt64, true),
Field::new("input_contract", DataType::Binary, true),
Field::new("policies_tip", DataType::UInt64, true), Field::new("policies_witness_limit", DataType::UInt64, true), Field::new("policies_maturity", DataType::UInt64, true), Field::new("policies_max_fee", DataType::UInt64, true), Field::new("script_gas_limit", DataType::UInt64, true), Field::new("maturity", DataType::UInt64, true),
Field::new("mint_amount", DataType::UInt64, true),
Field::new("mint_asset_id", DataType::Binary, true),
Field::new("mint_gas_price", DataType::UInt64, true), Field::new("tx_pointer_block_height", DataType::UInt64, true),
Field::new("tx_pointer_tx_index", DataType::UInt64, true),
Field::new("tx_type", DataType::UInt8, false), Field::new("output_contract_input_index", DataType::UInt64, true),
Field::new("output_contract_balance_root", DataType::Binary, true),
Field::new("output_contract_state_root", DataType::Binary, true),
Field::new("witnesses", DataType::Binary, true),
Field::new("receipts_root", DataType::Binary, true),
Field::new("status", DataType::UInt8, false),
Field::new("time", DataType::Int64, false),
Field::new("reason", DataType::Utf8, true),
Field::new("script", DataType::Binary, true),
Field::new("script_data", DataType::Binary, true),
Field::new("bytecode_witness_index", DataType::UInt64, true),
Field::new("bytecode_root", DataType::Binary, true), Field::new("subsection_index", DataType::UInt64, true), Field::new("subsections_number", DataType::UInt64, true), Field::new("proof_set", DataType::Binary, true), Field::new(
"consensus_parameters_upgrade_purpose_witness_index",
DataType::UInt64,
true,
), Field::new(
"consensus_parameters_upgrade_purpose_checksum",
DataType::Binary,
true,
), Field::new(
"state_transition_upgrade_purpose_root",
DataType::Binary,
true,
), Field::new("salt", DataType::Binary, true),
])
.into()
}
pub fn receipt() -> SchemaRef {
Schema::from(vec![
Field::new("receipt_index", DataType::UInt64, false),
Field::new("root_contract_id", DataType::Binary, true),
Field::new("tx_id", DataType::Binary, false),
Field::new("tx_status", DataType::UInt8, false), Field::new("tx_type", DataType::UInt8, false), Field::new("block_height", DataType::UInt64, false),
Field::new("pc", DataType::UInt64, true),
Field::new("is", DataType::UInt64, true),
Field::new("to", DataType::Binary, true),
Field::new("to_address", DataType::Binary, true),
Field::new("amount", DataType::UInt64, true),
Field::new("asset_id", DataType::Binary, true),
Field::new("gas", DataType::UInt64, true),
Field::new("param1", DataType::UInt64, true),
Field::new("param2", DataType::UInt64, true),
Field::new("val", DataType::UInt64, true),
Field::new("ptr", DataType::UInt64, true),
Field::new("digest", DataType::Binary, true),
Field::new("reason", DataType::UInt64, true),
Field::new("ra", DataType::UInt64, true),
Field::new("rb", DataType::UInt64, true),
Field::new("rc", DataType::UInt64, true),
Field::new("rd", DataType::UInt64, true),
Field::new("len", DataType::UInt64, true),
Field::new("receipt_type", DataType::UInt8, false),
Field::new("result", DataType::UInt64, true),
Field::new("gas_used", DataType::UInt64, true),
Field::new("data", DataType::Binary, true),
Field::new("sender", DataType::Binary, true),
Field::new("recipient", DataType::Binary, true),
Field::new("nonce", DataType::Binary, true),
Field::new("contract_id", DataType::Binary, true),
Field::new("sub_id", DataType::Binary, true),
])
.into()
}
pub fn input() -> SchemaRef {
Schema::from(vec![
Field::new("tx_id", DataType::Binary, false),
Field::new("tx_status", DataType::UInt8, false), Field::new("tx_type", DataType::UInt8, false), Field::new("block_height", DataType::UInt64, false),
Field::new("input_type", DataType::UInt8, false),
Field::new("utxo_id", DataType::Binary, true),
Field::new("owner", DataType::Binary, true),
Field::new("amount", DataType::UInt64, true),
Field::new("asset_id", DataType::Binary, true),
Field::new("tx_pointer_block_height", DataType::UInt64, true),
Field::new("tx_pointer_tx_index", DataType::UInt64, true),
Field::new("witness_index", DataType::UInt64, true),
Field::new("predicate_gas_used", DataType::UInt64, true),
Field::new("predicate", DataType::Binary, true),
Field::new("predicate_data", DataType::Binary, true),
Field::new("balance_root", DataType::Binary, true),
Field::new("state_root", DataType::Binary, true),
Field::new("contract", DataType::Binary, true),
Field::new("sender", DataType::Binary, true),
Field::new("recipient", DataType::Binary, true),
Field::new("nonce", DataType::Binary, true),
Field::new("data", DataType::Binary, true),
])
.into()
}
pub fn output() -> SchemaRef {
Schema::from(vec![
Field::new("tx_id", DataType::Binary, false),
Field::new("tx_status", DataType::UInt8, false), Field::new("tx_type", DataType::UInt8, false), Field::new("block_height", DataType::UInt64, false),
Field::new("output_type", DataType::UInt8, false),
Field::new("to", DataType::Binary, true),
Field::new("amount", DataType::UInt64, true),
Field::new("asset_id", DataType::Binary, true),
Field::new("input_index", DataType::UInt64, true),
Field::new("balance_root", DataType::Binary, true),
Field::new("state_root", DataType::Binary, true),
Field::new("contract", DataType::Binary, true),
])
.into()
}
pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
if chunks.is_empty() {
return Err(anyhow!("can't concat 0 chunks"));
}
let num_cols = chunks[0].columns().len();
let cols = (0..num_cols)
.map(|col| {
let arrs = chunks
.iter()
.map(|chunk| {
chunk
.columns()
.get(col)
.map(|col| col.as_ref())
.context("get column")
})
.collect::<Result<Vec<_>>>()?;
compute::concatenate::concatenate(&arrs).context("concat arrays")
})
.collect::<Result<Vec<_>>>()?;
Ok(ArrowChunk::new(cols))
}
pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
let mut cols = Vec::new();
for field in schema.fields.iter() {
cols.push(new_empty_array(field.data_type().clone()));
}
ArrowChunk::new(cols)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn smoke_test_schema_constructors() {
block_header();
transaction();
receipt();
input();
output();
}
}