hypersync_client/
column_mapping.rs

1use std::collections::BTreeMap;
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use hypersync_schema::ArrowChunk;
6use polars_arrow::array::{
7    Array, BinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, MutablePrimitiveArray,
8    MutableUtf8Array, PrimitiveArray, UInt32Array, UInt64Array, Utf8Array,
9};
10use polars_arrow::compute::cast::CastOptionsImpl as CastOptions;
11use polars_arrow::compute::{self, cast};
12use polars_arrow::datatypes::{ArrowDataType, ArrowSchema as Schema, Field};
13use polars_arrow::types::NativeType;
14use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
15use ruint::aliases::U256;
16use serde::{Deserialize, Serialize};
17
18use crate::ArrowBatch;
19
20/// Column mapping for stream function output.
21/// It lets you map columns you want into the DataTypes you want.
22#[derive(Default, Debug, Clone, Serialize, Deserialize)]
23pub struct ColumnMapping {
24    /// Mapping for block data.
25    #[serde(default)]
26    pub block: BTreeMap<String, DataType>,
27    /// Mapping for transaction data.
28    #[serde(default)]
29    pub transaction: BTreeMap<String, DataType>,
30    /// Mapping for log data.
31    #[serde(default)]
32    pub log: BTreeMap<String, DataType>,
33    /// Mapping for trace data.
34    #[serde(default)]
35    pub trace: BTreeMap<String, DataType>,
36    /// Mapping for decoded log data.
37    #[serde(default)]
38    pub decoded_log: BTreeMap<String, DataType>,
39}
40
41#[allow(missing_docs)]
42/// `DataType` is an enumeration representing the different data types that can be used in the column mapping.
43/// Each variant corresponds to a specific data type.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "lowercase")]
46pub enum DataType {
47    Float64,
48    Float32,
49    UInt64,
50    UInt32,
51    Int64,
52    Int32,
53    IntStr,
54}
55
56impl From<DataType> for ArrowDataType {
57    fn from(value: DataType) -> Self {
58        match value {
59            DataType::Float64 => Self::Float64,
60            DataType::Float32 => Self::Float32,
61            DataType::UInt64 => Self::UInt64,
62            DataType::UInt32 => Self::UInt32,
63            DataType::Int64 => Self::Int64,
64            DataType::Int32 => Self::Int32,
65            DataType::IntStr => Self::Utf8,
66        }
67    }
68}
69
70pub fn apply_to_batch(
71    batch: &ArrowBatch,
72    mapping: &BTreeMap<String, DataType>,
73) -> Result<ArrowBatch> {
74    if mapping.is_empty() {
75        return Ok(batch.clone());
76    }
77
78    let (fields, cols) = batch
79        .chunk
80        .columns()
81        .par_iter()
82        .zip(batch.schema.fields.par_iter())
83        .map(|(col, field)| {
84            let col = match mapping.get(&field.name) {
85                Some(&dt) => {
86                    if field.name == "l1_fee_scalar" {
87                        map_l1_fee_scalar(&**col, dt)
88                            .context(format!("apply cast to column '{}'", field.name))?
89                    } else {
90                        map_column(&**col, dt)
91                            .context(format!("apply cast to colum '{}'", field.name))?
92                    }
93                }
94                None => col.clone(),
95            };
96
97            Ok((
98                Field::new(
99                    field.name.clone(),
100                    col.data_type().clone(),
101                    field.is_nullable,
102                ),
103                col,
104            ))
105        })
106        .collect::<Result<(Vec<_>, Vec<_>)>>()?;
107
108    Ok(ArrowBatch {
109        chunk: ArrowChunk::new(cols).into(),
110        schema: Schema::from(fields).into(),
111    })
112}
113
114pub fn map_l1_fee_scalar(
115    col: &dyn Array,
116    target_data_type: DataType,
117) -> Result<Box<dyn Array + 'static>> {
118    let col = col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
119    let col = Float64Array::from_iter(
120        col.iter()
121            .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
122    );
123
124    let arr = compute::cast::cast(
125        &*to_box(col),
126        &target_data_type.into(),
127        CastOptions {
128            wrapped: false,
129            partial: false,
130        },
131    )
132    .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
133
134    Ok(arr)
135}
136
137fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
138    Box::new(arr)
139}
140
141fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
142    match target_data_type {
143        DataType::Float64 => map_to_f64(col).map(to_box),
144        DataType::Float32 => map_to_f32(col).map(to_box),
145        DataType::UInt64 => map_to_uint64(col).map(to_box),
146        DataType::UInt32 => map_to_uint32(col).map(to_box),
147        DataType::Int64 => map_to_int64(col).map(to_box),
148        DataType::Int32 => map_to_int32(col).map(to_box),
149        DataType::IntStr => map_to_int_str(col).map(to_box),
150    }
151}
152
153fn map_to_int_str(col: &dyn Array) -> Result<Utf8Array<i32>> {
154    match col.data_type() {
155        &ArrowDataType::Binary => {
156            binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
157        }
158        dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
159    }
160}
161
162fn binary_to_int_str_array(arr: &BinaryArray<i32>) -> Result<Utf8Array<i32>> {
163    let mut out = MutableUtf8Array::with_capacity(arr.len());
164
165    for val in arr.iter() {
166        out.push(val.map(binary_to_int_str).transpose()?);
167    }
168
169    Ok(out.into())
170}
171
172fn binary_to_int_str(binary: &[u8]) -> Result<String> {
173    let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
174    Ok(format!("{}", big_num))
175}
176
177fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
178    match col.data_type() {
179        &ArrowDataType::Binary => binary_to_target_array(
180            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
181            binary_to_f64,
182        ),
183        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
184            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
185            &ArrowDataType::Float64,
186        )),
187        dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
188    }
189}
190
191fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
192    match col.data_type() {
193        &ArrowDataType::Binary => binary_to_target_array(
194            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
195            binary_to_f32,
196        ),
197        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
198            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
199            &ArrowDataType::Float32,
200        )),
201        dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
202    }
203}
204
205fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
206    match col.data_type() {
207        &ArrowDataType::Binary => binary_to_target_array(
208            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
209            signed_binary_to_target::<u64>,
210        ),
211        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
212            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
213            &ArrowDataType::UInt64,
214        )),
215        dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
216    }
217}
218
219fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
220    match col.data_type() {
221        &ArrowDataType::Binary => binary_to_target_array(
222            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
223            signed_binary_to_target::<u32>,
224        ),
225        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
226            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
227            &ArrowDataType::UInt32,
228        )),
229        dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
230    }
231}
232
233fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
234    match col.data_type() {
235        &ArrowDataType::Binary => binary_to_target_array(
236            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
237            signed_binary_to_target::<i64>,
238        ),
239        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
240            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
241            &ArrowDataType::Int64,
242        )),
243        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
244    }
245}
246
247fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
248    match col.data_type() {
249        &ArrowDataType::Binary => binary_to_target_array(
250            col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
251            signed_binary_to_target::<i32>,
252        ),
253        &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
254            col.as_any().downcast_ref::<UInt64Array>().unwrap(),
255            &ArrowDataType::Int32,
256        )),
257        dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
258    }
259}
260
261fn binary_to_target_array<T: NativeType>(
262    src: &BinaryArray<i32>,
263    convert: fn(&[u8]) -> Result<T>,
264) -> Result<PrimitiveArray<T>> {
265    let mut out = MutablePrimitiveArray::with_capacity(src.len());
266
267    for val in src.iter() {
268        out.push(val.map(convert).transpose()?);
269    }
270
271    Ok(out.into())
272}
273
274fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
275    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
276
277    big_num
278        .try_into()
279        .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
280}
281
282// Special case for float because floats don't implement TryFrom<I256>
283fn binary_to_f64(src: &[u8]) -> Result<f64> {
284    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
285
286    let x = f64::from(U256::try_from(big_num.abs()).unwrap());
287
288    if !big_num.is_negative() {
289        Ok(x)
290    } else {
291        Ok(-x)
292    }
293}
294
295// Special case for float because floats don't implement TryFrom<I256>
296fn binary_to_f32(src: &[u8]) -> Result<f32> {
297    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
298
299    let x = f32::from(U256::try_from(big_num.abs()).unwrap());
300
301    if !big_num.is_negative() {
302        Ok(x)
303    } else {
304        Ok(-x)
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    #[test]
313    fn test_signed_binary_to_target() {
314        const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1];
315
316        for &input_num in RAW_INPUT {
317            let input = I256::try_from(input_num).unwrap();
318            let output =
319                signed_binary_to_target::<i64>(input.to_be_bytes::<32>().as_slice()).unwrap();
320            assert_eq!(i64::try_from(input).unwrap(), output);
321
322            let float_output = binary_to_f64(input.to_be_bytes::<32>().as_slice()).unwrap();
323            assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
324
325            let string_output = binary_to_int_str(input.to_be_bytes::<32>().as_slice()).unwrap();
326            assert_eq!(string_output, format!("{}", input_num));
327        }
328    }
329}