1use std::sync::Arc;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{new_empty_array, Array, BinaryArray, Utf8Array};
5use polars_arrow::compute;
6use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field, SchemaRef};
7use polars_arrow::record_batch::RecordBatchT as Chunk;
8
9mod util;
10
11pub use util::project_schema;
12
13pub type ArrowChunk = Chunk<Box<dyn Array>>;
14
15pub fn block_header() -> SchemaRef {
16 Schema::from(vec![
17 Field::new("id", DataType::BinaryView, false),
18 Field::new("da_height", DataType::UInt64, false),
19 Field::new("consensus_parameters_version", DataType::UInt64, false), Field::new("state_transition_bytecode_version", DataType::UInt64, false), Field::new("transactions_count", DataType::UInt64, false),
22 Field::new("message_receipt_count", DataType::UInt64, false),
23 Field::new("transactions_root", DataType::BinaryView, false),
24 Field::new("message_outbox_root", DataType::BinaryView, false), Field::new("event_inbox_root", DataType::BinaryView, false), Field::new("height", DataType::UInt64, false),
27 Field::new("prev_root", DataType::BinaryView, false),
28 Field::new("time", DataType::Int64, false),
29 Field::new("application_hash", DataType::BinaryView, false),
30 ])
31 .into()
32}
33
34pub fn transaction() -> SchemaRef {
35 Schema::from(vec![
36 Field::new("block_height", DataType::UInt64, false),
38 Field::new("id", DataType::BinaryView, false),
39 Field::new("input_asset_ids", DataType::BinaryView, true),
41 Field::new("input_contracts", DataType::BinaryView, true),
43 Field::new("input_contract_utxo_id", DataType::BinaryView, true),
44 Field::new("input_contract_balance_root", DataType::BinaryView, true),
45 Field::new("input_contract_state_root", DataType::BinaryView, true),
46 Field::new(
47 "input_contract_tx_pointer_block_height",
48 DataType::UInt64,
49 true,
50 ),
51 Field::new("input_contract_tx_pointer_tx_index", DataType::UInt64, true),
52 Field::new("input_contract", DataType::BinaryView, true),
53 Field::new("policies_tip", DataType::UInt64, true), Field::new("policies_witness_limit", DataType::UInt64, true), Field::new("policies_maturity", DataType::UInt64, true), Field::new("policies_max_fee", DataType::UInt64, true), Field::new("script_gas_limit", DataType::UInt64, true), Field::new("maturity", DataType::UInt64, true),
60 Field::new("mint_amount", DataType::UInt64, true),
61 Field::new("mint_asset_id", DataType::BinaryView, true),
62 Field::new("mint_gas_price", DataType::UInt64, true), Field::new("tx_pointer_block_height", DataType::UInt64, true),
64 Field::new("tx_pointer_tx_index", DataType::UInt64, true),
65 Field::new("tx_type", DataType::UInt8, false), Field::new("output_contract_input_index", DataType::UInt64, true),
67 Field::new("output_contract_balance_root", DataType::BinaryView, true),
68 Field::new("output_contract_state_root", DataType::BinaryView, true),
69 Field::new("witnesses", DataType::BinaryView, true),
71 Field::new("receipts_root", DataType::BinaryView, true),
72 Field::new("status", DataType::UInt8, false),
73 Field::new("time", DataType::Int64, false),
74 Field::new("reason", DataType::Utf8View, true),
75 Field::new("script", DataType::BinaryView, true),
76 Field::new("script_data", DataType::BinaryView, true),
77 Field::new("bytecode_witness_index", DataType::UInt64, true),
78 Field::new("bytecode_root", DataType::BinaryView, true), Field::new("subsection_index", DataType::UInt64, true), Field::new("subsections_number", DataType::UInt64, true), Field::new("proof_set", DataType::BinaryView, true), Field::new(
87 "consensus_parameters_upgrade_purpose_witness_index",
88 DataType::UInt64,
89 true,
90 ), Field::new(
92 "consensus_parameters_upgrade_purpose_checksum",
93 DataType::BinaryView,
94 true,
95 ), Field::new(
97 "state_transition_upgrade_purpose_root",
98 DataType::BinaryView,
99 true,
100 ), Field::new("salt", DataType::BinaryView, true),
102 ])
103 .into()
104}
105
106pub fn receipt() -> SchemaRef {
107 Schema::from(vec![
108 Field::new("receipt_index", DataType::UInt64, false),
110 Field::new("root_contract_id", DataType::BinaryView, true),
111 Field::new("tx_id", DataType::BinaryView, false),
112 Field::new("tx_status", DataType::UInt8, false), Field::new("tx_type", DataType::UInt8, false), Field::new("block_height", DataType::UInt64, false),
115 Field::new("pc", DataType::UInt64, true),
116 Field::new("is", DataType::UInt64, true),
117 Field::new("to", DataType::BinaryView, true),
118 Field::new("to_address", DataType::BinaryView, true),
119 Field::new("amount", DataType::UInt64, true),
120 Field::new("asset_id", DataType::BinaryView, true),
121 Field::new("gas", DataType::UInt64, true),
122 Field::new("param1", DataType::UInt64, true),
123 Field::new("param2", DataType::UInt64, true),
124 Field::new("val", DataType::UInt64, true),
125 Field::new("ptr", DataType::UInt64, true),
126 Field::new("digest", DataType::BinaryView, true),
127 Field::new("reason", DataType::UInt64, true),
128 Field::new("ra", DataType::UInt64, true),
129 Field::new("rb", DataType::UInt64, true),
130 Field::new("rc", DataType::UInt64, true),
131 Field::new("rd", DataType::UInt64, true),
132 Field::new("len", DataType::UInt64, true),
133 Field::new("receipt_type", DataType::UInt8, false),
134 Field::new("result", DataType::UInt64, true),
135 Field::new("gas_used", DataType::UInt64, true),
136 Field::new("data", DataType::BinaryView, true),
137 Field::new("sender", DataType::BinaryView, true),
138 Field::new("recipient", DataType::BinaryView, true),
139 Field::new("nonce", DataType::BinaryView, true),
140 Field::new("contract_id", DataType::BinaryView, true),
141 Field::new("sub_id", DataType::BinaryView, true),
142 ])
143 .into()
144}
145
146pub fn input() -> SchemaRef {
147 Schema::from(vec![
148 Field::new("tx_id", DataType::BinaryView, false),
150 Field::new("tx_status", DataType::UInt8, false), Field::new("tx_type", DataType::UInt8, false), Field::new("block_height", DataType::UInt64, false),
153 Field::new("input_type", DataType::UInt8, false),
154 Field::new("utxo_id", DataType::BinaryView, true),
155 Field::new("owner", DataType::BinaryView, true),
156 Field::new("amount", DataType::UInt64, true),
157 Field::new("asset_id", DataType::BinaryView, true),
158 Field::new("tx_pointer_block_height", DataType::UInt64, true),
159 Field::new("tx_pointer_tx_index", DataType::UInt64, true),
160 Field::new("witness_index", DataType::UInt64, true),
161 Field::new("predicate_gas_used", DataType::UInt64, true),
162 Field::new("predicate", DataType::BinaryView, true),
163 Field::new("predicate_data", DataType::BinaryView, true),
164 Field::new("balance_root", DataType::BinaryView, true),
165 Field::new("state_root", DataType::BinaryView, true),
166 Field::new("contract", DataType::BinaryView, true),
167 Field::new("sender", DataType::BinaryView, true),
168 Field::new("recipient", DataType::BinaryView, true),
169 Field::new("nonce", DataType::BinaryView, true),
170 Field::new("data", DataType::BinaryView, true),
171 ])
172 .into()
173}
174
175pub fn output() -> SchemaRef {
176 Schema::from(vec![
177 Field::new("tx_id", DataType::BinaryView, false),
179 Field::new("tx_status", DataType::UInt8, false), Field::new("tx_type", DataType::UInt8, false), Field::new("block_height", DataType::UInt64, false),
182 Field::new("output_type", DataType::UInt8, false),
183 Field::new("to", DataType::BinaryView, true),
184 Field::new("amount", DataType::UInt64, true),
185 Field::new("asset_id", DataType::BinaryView, true),
186 Field::new("input_index", DataType::UInt64, true),
187 Field::new("balance_root", DataType::BinaryView, true),
188 Field::new("state_root", DataType::BinaryView, true),
189 Field::new("contract", DataType::BinaryView, true),
190 ])
191 .into()
192}
193
194pub fn concat_chunks(chunks: &[Arc<ArrowChunk>]) -> Result<ArrowChunk> {
195 if chunks.is_empty() {
196 return Err(anyhow!("can't concat 0 chunks"));
197 }
198
199 let num_cols = chunks[0].columns().len();
200
201 let cols = (0..num_cols)
202 .map(|col| {
203 let mut is_utf8 = false;
204 let arrs = chunks
205 .iter()
206 .map(|chunk| {
207 let col = chunk
208 .columns()
209 .get(col)
210 .map(|col| col.as_ref())
211 .context("get column")?;
212 is_utf8 = col.data_type() == &DataType::Utf8;
213 Ok(col)
214 })
215 .collect::<Result<Vec<_>>>()?;
216 if !is_utf8 {
217 compute::concatenate::concatenate(&arrs).context("concat arrays")
218 } else {
219 let arrs = arrs
220 .into_iter()
221 .map(|a| {
222 a.as_any()
223 .downcast_ref::<Utf8Array<i32>>()
224 .unwrap()
225 .to_binary()
226 .boxed()
227 })
228 .collect::<Vec<_>>();
229 let arrs = arrs.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
230 let arr =
231 compute::concatenate::concatenate(arrs.as_slice()).context("concat arrays")?;
232
233 Ok(compute::cast::binary_to_utf8(
234 arr.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
235 DataType::Utf8,
236 )
237 .unwrap()
238 .boxed())
239 }
240 })
241 .collect::<Result<Vec<_>>>()?;
242
243 Ok(ArrowChunk::new(cols))
244}
245
246pub fn empty_chunk(schema: &Schema) -> ArrowChunk {
247 let mut cols = Vec::new();
248 for field in schema.fields.iter() {
249 cols.push(new_empty_array(field.data_type().clone()));
250 }
251 ArrowChunk::new(cols)
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn smoke_test_schema_constructors() {
260 block_header();
261 transaction();
262 receipt();
263 input();
264 output();
265 }
266
267 #[test]
268 fn test_concat_utf8() {
269 let chunks = [
270 Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
271 "hello".to_owned(),
272 )])
273 .boxed()])),
274 Arc::new(Chunk::new(vec![Utf8Array::<i32>::from(&[Some(
275 "world".to_owned(),
276 )])
277 .boxed()])),
278 ];
279
280 let out = concat_chunks(&chunks).unwrap();
281
282 assert_eq!(
283 out,
284 ArrowChunk::new(vec![Utf8Array::<i32>::from(&[
285 Some("hello".to_owned()),
286 Some("world".to_owned())
287 ])
288 .boxed(),])
289 )
290 }
291}