skar_client/
column_mapping.rs

1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Context, Result};
4use polars_arrow::array::{
5    Array, BinaryViewArray, Float32Array, Float64Array, Int32Array, Int64Array,
6    MutablePrimitiveArray, PrimitiveArray, UInt32Array, UInt64Array,
7};
8use polars_arrow::compute::cast;
9use polars_arrow::datatypes::ArrowDataType;
10use polars_arrow::types::NativeType;
11use rayon::prelude::*;
12use ruint::aliases::U256;
13use serde::{Deserialize, Serialize};
14use skar_schema::ArrowChunk;
15
16#[derive(Default, Debug, Clone, Serialize, Deserialize)]
17pub struct ColumnMapping {
18    #[serde(default)]
19    pub block: BTreeMap<String, DataType>,
20    #[serde(default)]
21    pub transaction: BTreeMap<String, DataType>,
22    #[serde(default)]
23    pub log: BTreeMap<String, DataType>,
24    #[serde(default)]
25    pub trace: BTreeMap<String, DataType>,
26    #[serde(default)]
27    pub decoded_log: BTreeMap<String, DataType>,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum DataType {
33    Float64,
34    Float32,
35    UInt64,
36    UInt32,
37    Int64,
38    Int32,
39}
40
41impl From<DataType> for ArrowDataType {
42    fn from(value: DataType) -> Self {
43        match value {
44            DataType::Float64 => Self::Float64,
45            DataType::Float32 => Self::Float32,
46            DataType::UInt64 => Self::UInt64,
47            DataType::UInt32 => Self::UInt32,
48            DataType::Int64 => Self::Int64,
49            DataType::Int32 => Self::Int32,
50        }
51    }
52}
53
54pub fn apply_to_chunk(
55    chunk: &ArrowChunk,
56    field_names: &[&str],
57    mapping: &BTreeMap<String, DataType>,
58) -> Result<ArrowChunk> {
59    if mapping.is_empty() {
60        return Ok(chunk.clone());
61    }
62
63    let columns = chunk
64        .columns()
65        .par_iter()
66        .zip(field_names.par_iter())
67        .map(|(col, &field_name)| {
68            let col = match mapping.get(field_name) {
69                Some(&dt) => map_column(&**col, dt)
70                    .context(format!("apply cast to colum '{}'", field_name))?,
71                None => col.clone(),
72            };
73
74            Ok(col)
75        })
76        .collect::<Result<Vec<_>>>()?;
77
78    Ok(ArrowChunk::new(columns))
79}
80
81pub fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
82    fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
83        Box::new(arr)
84    }
85
86    match target_data_type {
87        DataType::Float64 => map_to_f64(col).map(to_box),
88        DataType::Float32 => map_to_f32(col).map(to_box),
89        DataType::UInt64 => map_to_uint64(col).map(to_box),
90        DataType::UInt32 => map_to_uint32(col).map(to_box),
91        DataType::Int64 => map_to_int64(col).map(to_box),
92        DataType::Int32 => map_to_int32(col).map(to_box),
93    }
94}
95
96fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
97    match col.data_type() {
98        &ArrowDataType::BinaryView => {
99            binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
100        }
101        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
102            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
103            &ArrowDataType::Float64,
104        )),
105        dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
106    }
107}
108
109fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
110    match col.data_type() {
111        &ArrowDataType::BinaryView => {
112            binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
113        }
114        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
115            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
116            &ArrowDataType::Float32,
117        )),
118        dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
119    }
120}
121
122fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
123    match col.data_type() {
124        &ArrowDataType::BinaryView => {
125            binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
126        }
127        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
128            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
129            &ArrowDataType::UInt64,
130        )),
131        dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
132    }
133}
134
135fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
136    match col.data_type() {
137        &ArrowDataType::BinaryView => {
138            binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
139        }
140        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
141            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
142            &ArrowDataType::UInt32,
143        )),
144        dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
145    }
146}
147
148fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
149    match col.data_type() {
150        &ArrowDataType::BinaryView => {
151            binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
152        }
153        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
154            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
155            &ArrowDataType::Int64,
156        )),
157        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
158    }
159}
160
161fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
162    match col.data_type() {
163        &ArrowDataType::BinaryView => {
164            binary_to_target_array(col.as_any().downcast_ref::<BinaryViewArray>().unwrap())
165        }
166        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
167            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
168            &ArrowDataType::Int32,
169        )),
170        dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
171    }
172}
173
174fn binary_to_target_array<T: NativeType + TryFrom<U256>>(
175    src: &BinaryViewArray,
176) -> Result<PrimitiveArray<T>> {
177    let mut out = MutablePrimitiveArray::with_capacity(src.len());
178
179    for val in src.iter() {
180        out.push(val.map(binary_to_target).transpose()?);
181    }
182
183    Ok(out.into())
184}
185
186fn binary_to_target<T: TryFrom<U256>>(src: &[u8]) -> Result<T> {
187    let big_num = U256::from_be_slice(src);
188    big_num
189        .try_into()
190        .map_err(|_e| anyhow!("failed to cast number to requested type"))
191}