hypersync_client/
column_mapping.rs

1use std::collections::BTreeMap;
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use hypersync_schema::ArrowChunk;
6use polars_arrow::array::{
7    Array, BinaryArray, Float32Array, Float64Array, Int128Array, Int256Array, Int256Vec,
8    Int32Array, Int64Array, MutablePrimitiveArray, MutableUtf8Array, PrimitiveArray, UInt32Array,
9    UInt64Array, Utf8Array,
10};
11use polars_arrow::compute::cast::CastOptionsImpl as CastOptions;
12use polars_arrow::compute::{self, cast};
13use polars_arrow::datatypes::{ArrowDataType, ArrowSchema as Schema, Field};
14use polars_arrow::types::{i256 as Decimal, NativeType};
15use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
16use ruint::aliases::U256;
17use serde::{Deserialize, Serialize};
18
19use crate::ArrowBatch;
20
21/// Column mapping for stream function output.
22/// It lets you map columns you want into the DataTypes you want.
23#[derive(Default, Debug, Clone, Serialize, Deserialize)]
24pub struct ColumnMapping {
25    /// Mapping for block data.
26    #[serde(default)]
27    pub block: BTreeMap<String, DataType>,
28    /// Mapping for transaction data.
29    #[serde(default)]
30    pub transaction: BTreeMap<String, DataType>,
31    /// Mapping for log data.
32    #[serde(default)]
33    pub log: BTreeMap<String, DataType>,
34    /// Mapping for trace data.
35    #[serde(default)]
36    pub trace: BTreeMap<String, DataType>,
37    /// Mapping for decoded log data.
38    #[serde(default)]
39    pub decoded_log: BTreeMap<String, DataType>,
40}
41
42#[allow(missing_docs)]
43/// `DataType` is an enumeration representing the different data types that can be used in the column mapping.
44/// Each variant corresponds to a specific data type.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "lowercase")]
47pub enum DataType {
48    Float64,
49    Float32,
50    UInt64,
51    UInt32,
52    Int64,
53    Int32,
54    IntStr,
55    Decimal256,
56    Decimal128,
57}
58
59impl From<DataType> for ArrowDataType {
60    fn from(value: DataType) -> Self {
61        match value {
62            DataType::Float64 => Self::Float64,
63            DataType::Float32 => Self::Float32,
64            DataType::UInt64 => Self::UInt64,
65            DataType::UInt32 => Self::UInt32,
66            DataType::Int64 => Self::Int64,
67            DataType::Int32 => Self::Int32,
68            DataType::IntStr => Self::Utf8,
69            DataType::Decimal256 => Self::Decimal256(76, 0),
70            DataType::Decimal128 => Self::Decimal(38, 0),
71        }
72    }
73}
74
75pub fn apply_to_batch(
76    batch: &ArrowBatch,
77    mapping: &BTreeMap<String, DataType>,
78) -> Result<ArrowBatch> {
79    if mapping.is_empty() {
80        return Ok(batch.clone());
81    }
82
83    let (fields, cols) = batch
84        .chunk
85        .columns()
86        .par_iter()
87        .zip(batch.schema.fields.par_iter())
88        .map(|(col, field)| {
89            let col = match mapping.get(&field.name) {
90                Some(&dt) => {
91                    if field.name == "l1_fee_scalar" {
92                        map_l1_fee_scalar(&**col, dt)
93                            .context(format!("apply cast to column '{}'", field.name))?
94                    } else {
95                        map_column(&**col, dt)
96                            .context(format!("apply cast to colum '{}'", field.name))?
97                    }
98                }
99                None => col.clone(),
100            };
101
102            Ok((
103                Field::new(
104                    field.name.clone(),
105                    col.data_type().clone(),
106                    field.is_nullable,
107                ),
108                col,
109            ))
110        })
111        .collect::<Result<(Vec<_>, Vec<_>)>>()?;
112
113    Ok(ArrowBatch {
114        chunk: ArrowChunk::new(cols).into(),
115        schema: Schema::from(fields).into(),
116    })
117}
118
119pub fn map_l1_fee_scalar(
120    col: &dyn Array,
121    target_data_type: DataType,
122) -> Result<Box<dyn Array + 'static>> {
123    let col = col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
124    let col = Float64Array::from_iter(
125        col.iter()
126            .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
127    );
128
129    let arr = compute::cast::cast(
130        &*to_box(col),
131        &target_data_type.into(),
132        CastOptions {
133            wrapped: false,
134            partial: false,
135        },
136    )
137    .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
138
139    Ok(arr)
140}
141
142fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
143    Box::new(arr)
144}
145
146fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
147    match target_data_type {
148        DataType::Float64 => map_to_f64(col).map(to_box),
149        DataType::Float32 => map_to_f32(col).map(to_box),
150        DataType::UInt64 => map_to_uint64(col).map(to_box),
151        DataType::UInt32 => map_to_uint32(col).map(to_box),
152        DataType::Int64 => map_to_int64(col).map(to_box),
153        DataType::Int32 => map_to_int32(col).map(to_box),
154        DataType::IntStr => map_to_int_str(col).map(to_box),
155        DataType::Decimal256 => map_to_decimal(col).map(to_box),
156        DataType::Decimal128 => map_to_decimal128(col).map(to_box),
157    }
158}
159
160fn map_to_decimal(col: &dyn Array) -> Result<Int256Array> {
161    match col.data_type() {
162        &ArrowDataType::Binary => {
163            binary_to_decimal_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
164        }
165        dt => Err(anyhow!("Can't convert {:?} to decimal", dt)),
166    }
167}
168
169fn binary_to_decimal_array(arr: &BinaryArray<i32>) -> Result<Int256Array> {
170    let mut out = Int256Vec::with_capacity(arr.len());
171
172    for val in arr.iter() {
173        out.push(val.map(binary_to_decimal).transpose()?);
174    }
175
176    Ok(out.into())
177}
178
179fn binary_to_decimal(binary: &[u8]) -> Result<Decimal> {
180    let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
181    let decimal = Decimal::from_be_bytes(big_num.to_be_bytes::<32>());
182
183    Ok(decimal)
184}
185
186fn map_to_int_str(col: &dyn Array) -> Result<Utf8Array<i32>> {
187    match col.data_type() {
188        &ArrowDataType::Binary => {
189            binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
190        }
191        dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
192    }
193}
194
195fn binary_to_int_str_array(arr: &BinaryArray<i32>) -> Result<Utf8Array<i32>> {
196    let mut out = MutableUtf8Array::with_capacity(arr.len());
197
198    for val in arr.iter() {
199        out.push(val.map(binary_to_int_str).transpose()?);
200    }
201
202    Ok(out.into())
203}
204
205fn binary_to_int_str(binary: &[u8]) -> Result<String> {
206    let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
207    Ok(format!("{big_num}"))
208}
209
210fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
211    match col.data_type() {
212        &ArrowDataType::Binary => binary_to_target_array(
213            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
214            binary_to_f64,
215        ),
216        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
217            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
218            &ArrowDataType::Float64,
219        )),
220        dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
221    }
222}
223
224fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
225    match col.data_type() {
226        &ArrowDataType::Binary => binary_to_target_array(
227            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
228            binary_to_f32,
229        ),
230        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
231            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
232            &ArrowDataType::Float32,
233        )),
234        dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
235    }
236}
237
238fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
239    match col.data_type() {
240        &ArrowDataType::Binary => binary_to_target_array(
241            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
242            signed_binary_to_target::<u64>,
243        ),
244        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
245            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
246            &ArrowDataType::UInt64,
247        )),
248        dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
249    }
250}
251
252fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
253    match col.data_type() {
254        &ArrowDataType::Binary => binary_to_target_array(
255            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
256            signed_binary_to_target::<u32>,
257        ),
258        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
259            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
260            &ArrowDataType::UInt32,
261        )),
262        dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
263    }
264}
265
266fn map_to_decimal128(col: &dyn Array) -> Result<Int128Array> {
267    match col.data_type() {
268        &ArrowDataType::Binary => binary_to_target_array(
269            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
270            signed_binary_to_target::<i128>,
271        ),
272        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
273            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
274            &ArrowDataType::Decimal(38, 0),
275        )),
276        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
277    }
278}
279
280fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
281    match col.data_type() {
282        &ArrowDataType::Binary => binary_to_target_array(
283            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
284            signed_binary_to_target::<i64>,
285        ),
286        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
287            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
288            &ArrowDataType::Int64,
289        )),
290        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
291    }
292}
293
294fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
295    match col.data_type() {
296        &ArrowDataType::Binary => binary_to_target_array(
297            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
298            signed_binary_to_target::<i32>,
299        ),
300        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
301            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
302            &ArrowDataType::Int32,
303        )),
304        dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
305    }
306}
307
308fn binary_to_target_array<T: NativeType>(
309    src: &BinaryArray<i32>,
310    convert: fn(&[u8]) -> Result<T>,
311) -> Result<PrimitiveArray<T>> {
312    let mut out = MutablePrimitiveArray::with_capacity(src.len());
313
314    for val in src.iter() {
315        out.push(val.map(convert).transpose()?);
316    }
317
318    Ok(out.into())
319}
320
321fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
322    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
323
324    big_num
325        .try_into()
326        .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
327}
328
329// Special case for float because floats don't implement TryFrom<I256>
330fn binary_to_f64(src: &[u8]) -> Result<f64> {
331    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
332
333    let x = f64::from(U256::try_from(big_num.abs()).unwrap());
334
335    if !big_num.is_negative() {
336        Ok(x)
337    } else {
338        Ok(-x)
339    }
340}
341
342// Special case for float because floats don't implement TryFrom<I256>
343fn binary_to_f32(src: &[u8]) -> Result<f32> {
344    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
345
346    let x = f32::from(U256::try_from(big_num.abs()).unwrap());
347
348    if !big_num.is_negative() {
349        Ok(x)
350    } else {
351        Ok(-x)
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_signed_binary_to_target() {
361        const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1, i64::MAX, i64::MIN];
362
363        for &input_num in RAW_INPUT {
364            let input = I256::try_from(input_num).unwrap();
365            let input_bytes = input.to_be_bytes::<32>();
366            let input_bytes = input_bytes.as_slice();
367            let output = signed_binary_to_target::<i64>(input_bytes).unwrap();
368            assert_eq!(i64::try_from(input).unwrap(), output);
369
370            let float_output = binary_to_f64(input_bytes).unwrap();
371            assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
372
373            let string_output = binary_to_int_str(input_bytes).unwrap();
374            assert_eq!(string_output, format!("{}", input_num));
375
376            let decimal_output = binary_to_decimal(input_bytes).unwrap();
377            assert_eq!(decimal_output.to_be_bytes(), input_bytes);
378            assert_eq!(format!("{}", decimal_output), format!("{}", input));
379        }
380    }
381}