hyperfuel_client/
column_mapping.rs

1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Context, Result};
4use hyperfuel_schema::ArrowChunk;
5use polars_arrow::array::{
6    Array, BinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, MutablePrimitiveArray,
7    PrimitiveArray, UInt32Array, UInt64Array,
8};
9use polars_arrow::compute::cast::CastOptionsImpl as CastOptions;
10use polars_arrow::compute::{self, cast};
11use polars_arrow::datatypes::{ArrowDataType, ArrowSchema as Schema, Field};
12use polars_arrow::types::NativeType;
13use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
14use ruint::aliases::U256;
15use serde::{Deserialize, Serialize};
16
17use crate::ArrowBatch;
18
19/// Column mapping for stream function output.
20/// It lets you map columns you want into the DataTypes you want.
21#[derive(Default, Debug, Clone, Serialize, Deserialize)]
22pub struct ColumnMapping {
23    /// Mapping for block data.
24    #[serde(default)]
25    pub block: BTreeMap<String, DataType>,
26    /// Mapping for transaction data.
27    #[serde(default)]
28    pub transaction: BTreeMap<String, DataType>,
29    /// Mapping for log data.
30    #[serde(default)]
31    pub receipt: BTreeMap<String, DataType>,
32    /// Mapping for trace data.
33    #[serde(default)]
34    pub input: BTreeMap<String, DataType>,
35    /// Mapping for decoded log data.
36    #[serde(default)]
37    pub output: BTreeMap<String, DataType>,
38}
39
40#[allow(missing_docs)]
41/// `DataType` is an enumeration representing the different data types that can be used in the column mapping.
42/// Each variant corresponds to a specific data type.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum DataType {
46    Float64,
47    Float32,
48    UInt64,
49    UInt32,
50    Int64,
51    Int32,
52}
53
54impl From<DataType> for ArrowDataType {
55    fn from(value: DataType) -> Self {
56        match value {
57            DataType::Float64 => Self::Float64,
58            DataType::Float32 => Self::Float32,
59            DataType::UInt64 => Self::UInt64,
60            DataType::UInt32 => Self::UInt32,
61            DataType::Int64 => Self::Int64,
62            DataType::Int32 => Self::Int32,
63        }
64    }
65}
66
67pub fn apply_to_batch(
68    batch: &ArrowBatch,
69    mapping: &BTreeMap<String, DataType>,
70) -> Result<ArrowBatch> {
71    if mapping.is_empty() {
72        return Ok(batch.clone());
73    }
74
75    let (fields, cols) = batch
76        .chunk
77        .columns()
78        .par_iter()
79        .zip(batch.schema.fields.par_iter())
80        .map(|(col, field)| {
81            let col = match mapping.get(&field.name) {
82                Some(&dt) => {
83                    if field.name == "l1_fee_scalar" {
84                        map_l1_fee_scalar(&**col, dt)
85                            .context(format!("apply cast to column '{}'", field.name))?
86                    } else {
87                        map_column(&**col, dt)
88                            .context(format!("apply cast to colum '{}'", field.name))?
89                    }
90                }
91                None => col.clone(),
92            };
93
94            Ok((
95                Field::new(
96                    field.name.clone(),
97                    col.data_type().clone(),
98                    field.is_nullable,
99                ),
100                col,
101            ))
102        })
103        .collect::<Result<(Vec<_>, Vec<_>)>>()?;
104
105    Ok(ArrowBatch {
106        chunk: ArrowChunk::new(cols).into(),
107        schema: Schema::from(fields).into(),
108    })
109}
110
111pub fn map_l1_fee_scalar(
112    col: &dyn Array,
113    target_data_type: DataType,
114) -> Result<Box<dyn Array + 'static>> {
115    let col = col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
116    let col = Float64Array::from_iter(
117        col.iter()
118            .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
119    );
120
121    let arr = compute::cast::cast(
122        &*to_box(col),
123        &target_data_type.into(),
124        CastOptions {
125            wrapped: false,
126            partial: false,
127        },
128    )
129    .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
130
131    Ok(arr)
132}
133
134fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
135    Box::new(arr)
136}
137
138fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
139    match target_data_type {
140        DataType::Float64 => map_to_f64(col).map(to_box),
141        DataType::Float32 => map_to_f32(col).map(to_box),
142        DataType::UInt64 => map_to_uint64(col).map(to_box),
143        DataType::UInt32 => map_to_uint32(col).map(to_box),
144        DataType::Int64 => map_to_int64(col).map(to_box),
145        DataType::Int32 => map_to_int32(col).map(to_box),
146    }
147}
148
149fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
150    match col.data_type() {
151        &ArrowDataType::Binary => {
152            binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
153        }
154        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
155            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
156            &ArrowDataType::Float64,
157        )),
158        dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
159    }
160}
161
162fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
163    match col.data_type() {
164        &ArrowDataType::Binary => {
165            binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
166        }
167        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
168            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
169            &ArrowDataType::Float32,
170        )),
171        dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
172    }
173}
174
175fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
176    match col.data_type() {
177        &ArrowDataType::Binary => {
178            binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
179        }
180        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
181            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
182            &ArrowDataType::UInt64,
183        )),
184        dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
185    }
186}
187
188fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
189    match col.data_type() {
190        &ArrowDataType::Binary => {
191            binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
192        }
193        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
194            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
195            &ArrowDataType::UInt32,
196        )),
197        dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
198    }
199}
200
201fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
202    match col.data_type() {
203        &ArrowDataType::Binary => {
204            binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
205        }
206        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
207            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
208            &ArrowDataType::Int64,
209        )),
210        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
211    }
212}
213
214fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
215    match col.data_type() {
216        &ArrowDataType::Binary => {
217            binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
218        }
219        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
220            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
221            &ArrowDataType::Int32,
222        )),
223        dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
224    }
225}
226
227fn binary_to_target_array<T: NativeType + TryFrom<U256>>(
228    src: &BinaryArray<i32>,
229) -> Result<PrimitiveArray<T>> {
230    let mut out = MutablePrimitiveArray::with_capacity(src.len());
231
232    for val in src.iter() {
233        out.push(val.map(binary_to_target).transpose()?);
234    }
235
236    Ok(out.into())
237}
238
239fn binary_to_target<T: TryFrom<U256>>(src: &[u8]) -> Result<T> {
240    let big_num = U256::from_be_slice(src);
241    big_num
242        .try_into()
243        .map_err(|_e| anyhow!("failed to cast number to requested type"))
244}