1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{
5 Array, BinaryViewArray, Float32Array, Float64Array, Int32Array, Int64Array,
6 MutablePrimitiveArray, PrimitiveArray, UInt32Array, UInt64Array,
7};
8use polars_arrow::compute::cast;
9use polars_arrow::datatypes::ArrowDataType;
10use polars_arrow::types::NativeType;
11use rayon::prelude::*;
12use ruint::aliases::U256;
13use serde::{Deserialize, Serialize};
14use skar_schema::ArrowChunk;
15
16#[derive(Default, Debug, Clone, Serialize, Deserialize)]
17pub struct ColumnMapping {
18 #[serde(default)]
19 pub block: BTreeMap<String, DataType>,
20 #[serde(default)]
21 pub transaction: BTreeMap<String, DataType>,
22 #[serde(default)]
23 pub log: BTreeMap<String, DataType>,
24 #[serde(default)]
25 pub trace: BTreeMap<String, DataType>,
26 #[serde(default)]
27 pub decoded_log: BTreeMap<String, DataType>,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum DataType {
33 Float64,
34 Float32,
35 UInt64,
36 UInt32,
37 Int64,
38 Int32,
39}
40
41impl From<DataType> for ArrowDataType {
42 fn from(value: DataType) -> Self {
43 match value {
44 DataType::Float64 => Self::Float64,
45 DataType::Float32 => Self::Float32,
46 DataType::UInt64 => Self::UInt64,
47 DataType::UInt32 => Self::UInt32,
48 DataType::Int64 => Self::Int64,
49 DataType::Int32 => Self::Int32,
50 }
51 }
52}
53
54pub fn apply_to_chunk(
55 chunk: &ArrowChunk,
56 field_names: &[&str],
57 mapping: &BTreeMap<String, DataType>,
58) -> Result<ArrowChunk> {
59 if mapping.is_empty() {
60 return Ok(chunk.clone());
61 }
62
63 let columns = chunk
64 .columns()
65 .par_iter()
66 .zip(field_names.par_iter())
67 .map(|(col, &field_name)| {
68 let col = match mapping.get(field_name) {
69 Some(&dt) => map_column(&**col, dt)
70 .context(format!("apply cast to colum '{}'", field_name))?,
71 None => col.clone(),
72 };
73
74 Ok(col)
75 })
76 .collect::<Result<Vec<_>>>()?;
77
78 Ok(ArrowChunk::new(columns))
79}
80
81pub fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
82 fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
83 Box::new(arr)
84 }
85
86 match target_data_type {
87 DataType::Float64 => map_to_f64(col).map(to_box),
88 DataType::Float32 => map_to_f32(col).map(to_box),
89 DataType::UInt64 => map_to_uint64(col).map(to_box),
90 DataType::UInt32 => map_to_uint32(col).map(to_box),
91 DataType::Int64 => map_to_int64(col).map(to_box),
92 DataType::Int32 => map_to_int32(col).map(to_box),
93 }
94}
95
96fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
97 match col.data_type() {
98 &ArrowDataType::BinaryView => {
99 binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
100 }
101 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
102 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
103 &ArrowDataType::Float64,
104 )),
105 dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
106 }
107}
108
109fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
110 match col.data_type() {
111 &ArrowDataType::BinaryView => {
112 binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
113 }
114 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
115 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
116 &ArrowDataType::Float32,
117 )),
118 dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
119 }
120}
121
122fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
123 match col.data_type() {
124 &ArrowDataType::BinaryView => {
125 binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
126 }
127 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
128 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
129 &ArrowDataType::UInt64,
130 )),
131 dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
132 }
133}
134
135fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
136 match col.data_type() {
137 &ArrowDataType::BinaryView => {
138 binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
139 }
140 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
141 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
142 &ArrowDataType::UInt32,
143 )),
144 dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
145 }
146}
147
148fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
149 match col.data_type() {
150 &ArrowDataType::BinaryView => {
151 binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
152 }
153 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
154 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
155 &ArrowDataType::Int64,
156 )),
157 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
158 }
159}
160
161fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
162 match col.data_type() {
163 &ArrowDataType::BinaryView => {
164 binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
165 }
166 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
167 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
168 &ArrowDataType::Int32,
169 )),
170 dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
171 }
172}
173
174fn binary_to_target_array<T: NativeType + TryFrom<U256>>(
175 src: &BinaryViewArray,
176) -> Result<PrimitiveArray<T>> {
177 let mut out = MutablePrimitiveArray::with_capacity(src.len());
178
179 for val in src.iter() {
180 out.push(val.map(binary_to_target).transpose()?);
181 }
182
183 Ok(out.into())
184}
185
186fn binary_to_target<T: TryFrom<U256>>(src: &[u8]) -> Result<T> {
187 let big_num = U256::from_be_slice(src);
188 big_num
189 .try_into()
190 .map_err(|_e| anyhow!("failed to cast number to requested type"))
191}