Skip to main content

hypersync_client/
column_mapping.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use arrow::{
6    array::{
7        Array, ArrayRef, ArrowPrimitiveType, AsArray, BinaryArray, Decimal128Array,
8        Decimal256Array, Decimal256Builder, Float32Array, Float64Array, Int32Array, Int64Array,
9        PrimitiveArray, PrimitiveBuilder, RecordBatch, StringArray, StringBuilder, UInt32Array,
10        UInt64Array,
11    },
12    compute,
13    datatypes::{i256, DataType as ArrowDataType, Field, Schema},
14};
15use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
16use ruint::aliases::U256;
17use schemars::JsonSchema;
18use serde::{Deserialize, Serialize};
19
20/// Column mapping for stream function output.
21/// It lets you map columns you want into the DataTypes you want.
22#[derive(Default, Debug, Clone, Serialize, Deserialize, JsonSchema)]
23pub struct ColumnMapping {
24    /// Mapping for block data.
25    #[serde(default)]
26    pub block: BTreeMap<String, DataType>,
27    /// Mapping for transaction data.
28    #[serde(default)]
29    pub transaction: BTreeMap<String, DataType>,
30    /// Mapping for log data.
31    #[serde(default)]
32    pub log: BTreeMap<String, DataType>,
33    /// Mapping for trace data.
34    #[serde(default)]
35    pub trace: BTreeMap<String, DataType>,
36    /// Mapping for decoded log data.
37    #[serde(default)]
38    pub decoded_log: BTreeMap<String, DataType>,
39}
40
41#[allow(missing_docs)]
42/// `DataType` is an enumeration representing the different data types that can be used in the column mapping.
43/// Each variant corresponds to a specific data type.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
45#[serde(rename_all = "lowercase")]
46pub enum DataType {
47    Float64,
48    Float32,
49    UInt64,
50    UInt32,
51    Int64,
52    Int32,
53    IntStr,
54    Decimal256,
55    Decimal128,
56}
57
58impl From<DataType> for ArrowDataType {
59    fn from(value: DataType) -> Self {
60        match value {
61            DataType::Float64 => Self::Float64,
62            DataType::Float32 => Self::Float32,
63            DataType::UInt64 => Self::UInt64,
64            DataType::UInt32 => Self::UInt32,
65            DataType::Int64 => Self::Int64,
66            DataType::Int32 => Self::Int32,
67            DataType::IntStr => Self::Utf8,
68            DataType::Decimal256 => Self::Decimal256(76, 0),
69            DataType::Decimal128 => Self::Decimal128(38, 0),
70        }
71    }
72}
73
74pub fn apply_to_batch(
75    batch: &RecordBatch,
76    mapping: &BTreeMap<String, DataType>,
77) -> Result<RecordBatch> {
78    if mapping.is_empty() {
79        return Ok(batch.clone());
80    }
81
82    let (fields, cols) = batch
83        .columns()
84        .par_iter()
85        .zip(batch.schema().fields().par_iter())
86        .map(|(col, field)| {
87            let col = match mapping.get(field.name()) {
88                Some(&dt) => {
89                    if field.name() == "l1_fee_scalar" {
90                        map_l1_fee_scalar(&**col, dt)
91                            .context(format!("apply cast to column '{}'", field.name()))?
92                    } else {
93                        map_column(&**col, dt)
94                            .context(format!("apply cast to column '{}'", field.name()))?
95                    }
96                }
97                None => col.clone(),
98            };
99
100            Ok((
101                Field::new(
102                    field.name().clone(),
103                    col.data_type().clone(),
104                    field.is_nullable(),
105                ),
106                col,
107            ))
108        })
109        .collect::<Result<(Vec<_>, Vec<_>)>>()?;
110
111    let schema = Arc::new(Schema::new(fields));
112
113    Ok(RecordBatch::try_new(schema, cols).unwrap())
114}
115
116pub fn map_l1_fee_scalar(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
117    let col = col.as_any().downcast_ref::<BinaryArray>().unwrap();
118    let col = Float64Array::from_iter(
119        col.iter()
120            .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
121    );
122
123    let arrow_dt = ArrowDataType::from(target_data_type);
124
125    let arr = compute::cast(&col, &arrow_dt)
126        .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
127
128    Ok(arr)
129}
130
131fn to_array_ref<Arr: Array + 'static>(arr: Arr) -> ArrayRef {
132    Arc::new(arr)
133}
134
135fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
136    match target_data_type {
137        DataType::Float64 => map_to_f64(col).map(to_array_ref),
138        DataType::Float32 => map_to_f32(col).map(to_array_ref),
139        DataType::UInt64 => map_to_uint64(col).map(to_array_ref),
140        DataType::UInt32 => map_to_uint32(col).map(to_array_ref),
141        DataType::Int64 => map_to_int64(col).map(to_array_ref),
142        DataType::Int32 => map_to_int32(col).map(to_array_ref),
143        DataType::IntStr => map_to_int_str(col).map(to_array_ref),
144        DataType::Decimal256 => map_to_decimal(col).map(to_array_ref),
145        DataType::Decimal128 => map_to_decimal128(col).map(to_array_ref),
146    }
147}
148
149fn map_to_decimal(col: &dyn Array) -> Result<Decimal256Array> {
150    match col.data_type() {
151        &ArrowDataType::Binary => {
152            binary_to_decimal_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
153        }
154        dt => Err(anyhow!("Can't convert {:?} to decimal", dt)),
155    }
156}
157
158fn binary_to_decimal_array(arr: &BinaryArray) -> Result<Decimal256Array> {
159    let mut out = Decimal256Builder::with_capacity(arr.len());
160
161    for val in arr.iter() {
162        out.append_option(val.map(binary_to_decimal).transpose()?);
163    }
164
165    Ok(out.finish())
166}
167
168fn binary_to_decimal(binary: &[u8]) -> Result<i256> {
169    let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
170    let decimal = i256::from_be_bytes(big_num.to_be_bytes::<32>());
171
172    Ok(decimal)
173}
174
175fn map_to_int_str(col: &dyn Array) -> Result<StringArray> {
176    match col.data_type() {
177        &ArrowDataType::Binary => {
178            binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
179        }
180        dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
181    }
182}
183
184fn binary_to_int_str_array(arr: &BinaryArray) -> Result<StringArray> {
185    let mut out = StringBuilder::new();
186
187    for val in arr.iter() {
188        out.append_option(val.map(binary_to_int_str).transpose()?);
189    }
190
191    Ok(out.finish())
192}
193
194fn binary_to_int_str(binary: &[u8]) -> Result<String> {
195    let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
196    Ok(format!("{big_num}"))
197}
198
199fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
200    match col.data_type() {
201        &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f64),
202        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float64)
203            .context("arrow cast")?
204            .as_primitive()
205            .clone()),
206        dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
207    }
208}
209
210fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
211    match col.data_type() {
212        &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f32),
213        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float32)
214            .context("arrow cast")?
215            .as_primitive()
216            .clone()),
217        dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
218    }
219}
220
221fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
222    match col.data_type() {
223        &ArrowDataType::Binary => {
224            binary_to_target_array(col.as_binary(), signed_binary_to_target::<u64>)
225        }
226        &ArrowDataType::UInt64 => Ok(col.as_primitive().clone()),
227        dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
228    }
229}
230
231fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
232    match col.data_type() {
233        &ArrowDataType::Binary => {
234            binary_to_target_array(col.as_binary(), signed_binary_to_target::<u32>)
235        }
236        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::UInt32)
237            .context("arrow cast")?
238            .as_primitive()
239            .clone()),
240        dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
241    }
242}
243
244fn map_to_decimal128(col: &dyn Array) -> Result<Decimal128Array> {
245    match col.data_type() {
246        &ArrowDataType::Binary => {
247            binary_to_target_array(col.as_binary(), signed_binary_to_target::<i128>)
248        }
249        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Decimal128(38, 0))
250            .context("arrow cast")?
251            .as_primitive()
252            .clone()),
253        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
254    }
255}
256
257fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
258    match col.data_type() {
259        &ArrowDataType::Binary => {
260            binary_to_target_array(col.as_binary(), signed_binary_to_target::<i64>)
261        }
262        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int64)
263            .context("arrow cast")?
264            .as_primitive()
265            .clone()),
266        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
267    }
268}
269
270fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
271    match col.data_type() {
272        &ArrowDataType::Binary => {
273            binary_to_target_array(col.as_binary(), signed_binary_to_target::<i32>)
274        }
275        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int32)
276            .context("arrow cast")?
277            .as_primitive()
278            .clone()),
279        dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
280    }
281}
282
283fn binary_to_target_array<T: ArrowPrimitiveType>(
284    src: &BinaryArray,
285    convert: fn(&[u8]) -> Result<T::Native>,
286) -> Result<PrimitiveArray<T>> {
287    let mut out = PrimitiveBuilder::<T>::with_capacity(src.len());
288
289    for val in src.iter() {
290        out.append_option(val.map(convert).transpose()?);
291    }
292
293    Ok(out.finish())
294}
295
296fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
297    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
298
299    big_num
300        .try_into()
301        .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
302}
303
304// Special case for float because floats don't implement TryFrom<I256>
305fn binary_to_f64(src: &[u8]) -> Result<f64> {
306    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
307
308    let x = f64::from(U256::try_from(big_num.abs()).unwrap());
309
310    if !big_num.is_negative() {
311        Ok(x)
312    } else {
313        Ok(-x)
314    }
315}
316
317// Special case for float because floats don't implement TryFrom<I256>
318fn binary_to_f32(src: &[u8]) -> Result<f32> {
319    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
320
321    let x = f32::from(U256::try_from(big_num.abs()).unwrap());
322
323    if !big_num.is_negative() {
324        Ok(x)
325    } else {
326        Ok(-x)
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_signed_binary_to_target() {
336        const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1, i64::MAX, i64::MIN];
337
338        for &input_num in RAW_INPUT {
339            let input = I256::try_from(input_num).unwrap();
340            let input_bytes = input.to_be_bytes::<32>();
341            let input_bytes = input_bytes.as_slice();
342            let output = signed_binary_to_target::<i64>(input_bytes).unwrap();
343            assert_eq!(i64::try_from(input).unwrap(), output);
344
345            let float_output = binary_to_f64(input_bytes).unwrap();
346            assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
347
348            let string_output = binary_to_int_str(input_bytes).unwrap();
349            assert_eq!(string_output, format!("{}", input_num));
350
351            let decimal_output = binary_to_decimal(input_bytes).unwrap();
352            assert_eq!(decimal_output.to_be_bytes(), input_bytes);
353            assert_eq!(format!("{}", decimal_output), format!("{}", input));
354        }
355    }
356}