hypersync_client/
column_mapping.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use arrow::{
6    array::{
7        Array, ArrayRef, ArrowPrimitiveType, AsArray, BinaryArray, Decimal128Array,
8        Decimal256Array, Decimal256Builder, Float32Array, Float64Array, Int32Array, Int64Array,
9        PrimitiveArray, PrimitiveBuilder, RecordBatch, StringArray, StringBuilder, UInt32Array,
10        UInt64Array,
11    },
12    compute,
13    datatypes::{i256, DataType as ArrowDataType, Field, Schema},
14};
15use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
16use ruint::aliases::U256;
17use serde::{Deserialize, Serialize};
18
19/// Column mapping for stream function output.
20/// It lets you map columns you want into the DataTypes you want.
21#[derive(Default, Debug, Clone, Serialize, Deserialize)]
22pub struct ColumnMapping {
23    /// Mapping for block data.
24    #[serde(default)]
25    pub block: BTreeMap<String, DataType>,
26    /// Mapping for transaction data.
27    #[serde(default)]
28    pub transaction: BTreeMap<String, DataType>,
29    /// Mapping for log data.
30    #[serde(default)]
31    pub log: BTreeMap<String, DataType>,
32    /// Mapping for trace data.
33    #[serde(default)]
34    pub trace: BTreeMap<String, DataType>,
35    /// Mapping for decoded log data.
36    #[serde(default)]
37    pub decoded_log: BTreeMap<String, DataType>,
38}
39
40#[allow(missing_docs)]
41/// `DataType` is an enumeration representing the different data types that can be used in the column mapping.
42/// Each variant corresponds to a specific data type.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum DataType {
46    Float64,
47    Float32,
48    UInt64,
49    UInt32,
50    Int64,
51    Int32,
52    IntStr,
53    Decimal256,
54    Decimal128,
55}
56
57impl From<DataType> for ArrowDataType {
58    fn from(value: DataType) -> Self {
59        match value {
60            DataType::Float64 => Self::Float64,
61            DataType::Float32 => Self::Float32,
62            DataType::UInt64 => Self::UInt64,
63            DataType::UInt32 => Self::UInt32,
64            DataType::Int64 => Self::Int64,
65            DataType::Int32 => Self::Int32,
66            DataType::IntStr => Self::Utf8,
67            DataType::Decimal256 => Self::Decimal256(76, 0),
68            DataType::Decimal128 => Self::Decimal128(38, 0),
69        }
70    }
71}
72
73pub fn apply_to_batch(
74    batch: &RecordBatch,
75    mapping: &BTreeMap<String, DataType>,
76) -> Result<RecordBatch> {
77    if mapping.is_empty() {
78        return Ok(batch.clone());
79    }
80
81    let (fields, cols) = batch
82        .columns()
83        .par_iter()
84        .zip(batch.schema().fields().par_iter())
85        .map(|(col, field)| {
86            let col = match mapping.get(field.name()) {
87                Some(&dt) => {
88                    if field.name() == "l1_fee_scalar" {
89                        map_l1_fee_scalar(&**col, dt)
90                            .context(format!("apply cast to column '{}'", field.name()))?
91                    } else {
92                        map_column(&**col, dt)
93                            .context(format!("apply cast to column '{}'", field.name()))?
94                    }
95                }
96                None => col.clone(),
97            };
98
99            Ok((
100                Field::new(
101                    field.name().clone(),
102                    col.data_type().clone(),
103                    field.is_nullable(),
104                ),
105                col,
106            ))
107        })
108        .collect::<Result<(Vec<_>, Vec<_>)>>()?;
109
110    let schema = Arc::new(Schema::new(fields));
111
112    Ok(RecordBatch::try_new(schema, cols).unwrap())
113}
114
115pub fn map_l1_fee_scalar(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
116    let col = col.as_any().downcast_ref::<BinaryArray>().unwrap();
117    let col = Float64Array::from_iter(
118        col.iter()
119            .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
120    );
121
122    let arrow_dt = ArrowDataType::from(target_data_type);
123
124    let arr = compute::cast(&col, &arrow_dt)
125        .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
126
127    Ok(arr)
128}
129
130fn to_array_ref<Arr: Array + 'static>(arr: Arr) -> ArrayRef {
131    Arc::new(arr)
132}
133
134fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
135    match target_data_type {
136        DataType::Float64 => map_to_f64(col).map(to_array_ref),
137        DataType::Float32 => map_to_f32(col).map(to_array_ref),
138        DataType::UInt64 => map_to_uint64(col).map(to_array_ref),
139        DataType::UInt32 => map_to_uint32(col).map(to_array_ref),
140        DataType::Int64 => map_to_int64(col).map(to_array_ref),
141        DataType::Int32 => map_to_int32(col).map(to_array_ref),
142        DataType::IntStr => map_to_int_str(col).map(to_array_ref),
143        DataType::Decimal256 => map_to_decimal(col).map(to_array_ref),
144        DataType::Decimal128 => map_to_decimal128(col).map(to_array_ref),
145    }
146}
147
148fn map_to_decimal(col: &dyn Array) -> Result<Decimal256Array> {
149    match col.data_type() {
150        &ArrowDataType::Binary => {
151            binary_to_decimal_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
152        }
153        dt => Err(anyhow!("Can't convert {:?} to decimal", dt)),
154    }
155}
156
157fn binary_to_decimal_array(arr: &BinaryArray) -> Result<Decimal256Array> {
158    let mut out = Decimal256Builder::with_capacity(arr.len());
159
160    for val in arr.iter() {
161        out.append_option(val.map(binary_to_decimal).transpose()?);
162    }
163
164    Ok(out.finish())
165}
166
167fn binary_to_decimal(binary: &[u8]) -> Result<i256> {
168    let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
169    let decimal = i256::from_be_bytes(big_num.to_be_bytes::<32>());
170
171    Ok(decimal)
172}
173
174fn map_to_int_str(col: &dyn Array) -> Result<StringArray> {
175    match col.data_type() {
176        &ArrowDataType::Binary => {
177            binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
178        }
179        dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
180    }
181}
182
183fn binary_to_int_str_array(arr: &BinaryArray) -> Result<StringArray> {
184    let mut out = StringBuilder::new();
185
186    for val in arr.iter() {
187        out.append_option(val.map(binary_to_int_str).transpose()?);
188    }
189
190    Ok(out.finish())
191}
192
193fn binary_to_int_str(binary: &[u8]) -> Result<String> {
194    let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
195    Ok(format!("{big_num}"))
196}
197
198fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
199    match col.data_type() {
200        &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f64),
201        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float64)
202            .context("arrow cast")?
203            .as_primitive()
204            .clone()),
205        dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
206    }
207}
208
209fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
210    match col.data_type() {
211        &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f32),
212        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float32)
213            .context("arrow cast")?
214            .as_primitive()
215            .clone()),
216        dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
217    }
218}
219
220fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
221    match col.data_type() {
222        &ArrowDataType::Binary => {
223            binary_to_target_array(col.as_binary(), signed_binary_to_target::<u64>)
224        }
225        &ArrowDataType::UInt64 => Ok(col.as_primitive().clone()),
226        dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
227    }
228}
229
230fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
231    match col.data_type() {
232        &ArrowDataType::Binary => {
233            binary_to_target_array(col.as_binary(), signed_binary_to_target::<u32>)
234        }
235        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::UInt32)
236            .context("arrow cast")?
237            .as_primitive()
238            .clone()),
239        dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
240    }
241}
242
243fn map_to_decimal128(col: &dyn Array) -> Result<Decimal128Array> {
244    match col.data_type() {
245        &ArrowDataType::Binary => {
246            binary_to_target_array(col.as_binary(), signed_binary_to_target::<i128>)
247        }
248        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Decimal128(38, 0))
249            .context("arrow cast")?
250            .as_primitive()
251            .clone()),
252        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
253    }
254}
255
256fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
257    match col.data_type() {
258        &ArrowDataType::Binary => {
259            binary_to_target_array(col.as_binary(), signed_binary_to_target::<i64>)
260        }
261        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int64)
262            .context("arrow cast")?
263            .as_primitive()
264            .clone()),
265        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
266    }
267}
268
269fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
270    match col.data_type() {
271        &ArrowDataType::Binary => {
272            binary_to_target_array(col.as_binary(), signed_binary_to_target::<i32>)
273        }
274        &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int32)
275            .context("arrow cast")?
276            .as_primitive()
277            .clone()),
278        dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
279    }
280}
281
282fn binary_to_target_array<T: ArrowPrimitiveType>(
283    src: &BinaryArray,
284    convert: fn(&[u8]) -> Result<T::Native>,
285) -> Result<PrimitiveArray<T>> {
286    let mut out = PrimitiveBuilder::<T>::with_capacity(src.len());
287
288    for val in src.iter() {
289        out.append_option(val.map(convert).transpose()?);
290    }
291
292    Ok(out.finish())
293}
294
295fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
296    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
297
298    big_num
299        .try_into()
300        .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
301}
302
303// Special case for float because floats don't implement TryFrom<I256>
304fn binary_to_f64(src: &[u8]) -> Result<f64> {
305    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
306
307    let x = f64::from(U256::try_from(big_num.abs()).unwrap());
308
309    if !big_num.is_negative() {
310        Ok(x)
311    } else {
312        Ok(-x)
313    }
314}
315
316// Special case for float because floats don't implement TryFrom<I256>
317fn binary_to_f32(src: &[u8]) -> Result<f32> {
318    let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
319
320    let x = f32::from(U256::try_from(big_num.abs()).unwrap());
321
322    if !big_num.is_negative() {
323        Ok(x)
324    } else {
325        Ok(-x)
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn test_signed_binary_to_target() {
335        const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1, i64::MAX, i64::MIN];
336
337        for &input_num in RAW_INPUT {
338            let input = I256::try_from(input_num).unwrap();
339            let input_bytes = input.to_be_bytes::<32>();
340            let input_bytes = input_bytes.as_slice();
341            let output = signed_binary_to_target::<i64>(input_bytes).unwrap();
342            assert_eq!(i64::try_from(input).unwrap(), output);
343
344            let float_output = binary_to_f64(input_bytes).unwrap();
345            assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
346
347            let string_output = binary_to_int_str(input_bytes).unwrap();
348            assert_eq!(string_output, format!("{}", input_num));
349
350            let decimal_output = binary_to_decimal(input_bytes).unwrap();
351            assert_eq!(decimal_output.to_be_bytes(), input_bytes);
352            assert_eq!(format!("{}", decimal_output), format!("{}", input));
353        }
354    }
355}