1use std::collections::BTreeMap;
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use hypersync_schema::ArrowChunk;
6use polars_arrow::array::{
7 Array, BinaryArray, Float32Array, Float64Array, Int128Array, Int256Array, Int256Vec,
8 Int32Array, Int64Array, MutablePrimitiveArray, MutableUtf8Array, PrimitiveArray, UInt32Array,
9 UInt64Array, Utf8Array,
10};
11use polars_arrow::compute::cast::CastOptionsImpl as CastOptions;
12use polars_arrow::compute::{self, cast};
13use polars_arrow::datatypes::{ArrowDataType, ArrowSchema as Schema, Field};
14use polars_arrow::types::{i256 as Decimal, NativeType};
15use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
16use ruint::aliases::U256;
17use serde::{Deserialize, Serialize};
18
19use crate::ArrowBatch;
20
21#[derive(Default, Debug, Clone, Serialize, Deserialize)]
24pub struct ColumnMapping {
25 #[serde(default)]
27 pub block: BTreeMap<String, DataType>,
28 #[serde(default)]
30 pub transaction: BTreeMap<String, DataType>,
31 #[serde(default)]
33 pub log: BTreeMap<String, DataType>,
34 #[serde(default)]
36 pub trace: BTreeMap<String, DataType>,
37 #[serde(default)]
39 pub decoded_log: BTreeMap<String, DataType>,
40}
41
42#[allow(missing_docs)]
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "lowercase")]
47pub enum DataType {
48 Float64,
49 Float32,
50 UInt64,
51 UInt32,
52 Int64,
53 Int32,
54 IntStr,
55 Decimal256,
56 Decimal128,
57}
58
59impl From<DataType> for ArrowDataType {
60 fn from(value: DataType) -> Self {
61 match value {
62 DataType::Float64 => Self::Float64,
63 DataType::Float32 => Self::Float32,
64 DataType::UInt64 => Self::UInt64,
65 DataType::UInt32 => Self::UInt32,
66 DataType::Int64 => Self::Int64,
67 DataType::Int32 => Self::Int32,
68 DataType::IntStr => Self::Utf8,
69 DataType::Decimal256 => Self::Decimal256(76, 0),
70 DataType::Decimal128 => Self::Decimal(38, 0),
71 }
72 }
73}
74
75pub fn apply_to_batch(
76 batch: &ArrowBatch,
77 mapping: &BTreeMap<String, DataType>,
78) -> Result<ArrowBatch> {
79 if mapping.is_empty() {
80 return Ok(batch.clone());
81 }
82
83 let (fields, cols) = batch
84 .chunk
85 .columns()
86 .par_iter()
87 .zip(batch.schema.fields.par_iter())
88 .map(|(col, field)| {
89 let col = match mapping.get(&field.name) {
90 Some(&dt) => {
91 if field.name == "l1_fee_scalar" {
92 map_l1_fee_scalar(&**col, dt)
93 .context(format!("apply cast to column '{}'", field.name))?
94 } else {
95 map_column(&**col, dt)
96 .context(format!("apply cast to colum '{}'", field.name))?
97 }
98 }
99 None => col.clone(),
100 };
101
102 Ok((
103 Field::new(
104 field.name.clone(),
105 col.data_type().clone(),
106 field.is_nullable,
107 ),
108 col,
109 ))
110 })
111 .collect::<Result<(Vec<_>, Vec<_>)>>()?;
112
113 Ok(ArrowBatch {
114 chunk: ArrowChunk::new(cols).into(),
115 schema: Schema::from(fields).into(),
116 })
117}
118
119pub fn map_l1_fee_scalar(
120 col: &dyn Array,
121 target_data_type: DataType,
122) -> Result<Box<dyn Array + 'static>> {
123 let col = col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
124 let col = Float64Array::from_iter(
125 col.iter()
126 .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
127 );
128
129 let arr = compute::cast::cast(
130 &*to_box(col),
131 &target_data_type.into(),
132 CastOptions {
133 wrapped: false,
134 partial: false,
135 },
136 )
137 .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
138
139 Ok(arr)
140}
141
142fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
143 Box::new(arr)
144}
145
146fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
147 match target_data_type {
148 DataType::Float64 => map_to_f64(col).map(to_box),
149 DataType::Float32 => map_to_f32(col).map(to_box),
150 DataType::UInt64 => map_to_uint64(col).map(to_box),
151 DataType::UInt32 => map_to_uint32(col).map(to_box),
152 DataType::Int64 => map_to_int64(col).map(to_box),
153 DataType::Int32 => map_to_int32(col).map(to_box),
154 DataType::IntStr => map_to_int_str(col).map(to_box),
155 DataType::Decimal256 => map_to_decimal(col).map(to_box),
156 DataType::Decimal128 => map_to_decimal128(col).map(to_box),
157 }
158}
159
160fn map_to_decimal(col: &dyn Array) -> Result<Int256Array> {
161 match col.data_type() {
162 &ArrowDataType::Binary => {
163 binary_to_decimal_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
164 }
165 dt => Err(anyhow!("Can't convert {:?} to decimal", dt)),
166 }
167}
168
169fn binary_to_decimal_array(arr: &BinaryArray<i32>) -> Result<Int256Array> {
170 let mut out = Int256Vec::with_capacity(arr.len());
171
172 for val in arr.iter() {
173 out.push(val.map(binary_to_decimal).transpose()?);
174 }
175
176 Ok(out.into())
177}
178
179fn binary_to_decimal(binary: &[u8]) -> Result<Decimal> {
180 let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
181 let decimal = Decimal::from_be_bytes(big_num.to_be_bytes::<32>());
182
183 Ok(decimal)
184}
185
186fn map_to_int_str(col: &dyn Array) -> Result<Utf8Array<i32>> {
187 match col.data_type() {
188 &ArrowDataType::Binary => {
189 binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
190 }
191 dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
192 }
193}
194
195fn binary_to_int_str_array(arr: &BinaryArray<i32>) -> Result<Utf8Array<i32>> {
196 let mut out = MutableUtf8Array::with_capacity(arr.len());
197
198 for val in arr.iter() {
199 out.push(val.map(binary_to_int_str).transpose()?);
200 }
201
202 Ok(out.into())
203}
204
205fn binary_to_int_str(binary: &[u8]) -> Result<String> {
206 let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
207 Ok(format!("{big_num}"))
208}
209
210fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
211 match col.data_type() {
212 &ArrowDataType::Binary => binary_to_target_array(
213 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
214 binary_to_f64,
215 ),
216 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
217 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
218 &ArrowDataType::Float64,
219 )),
220 dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
221 }
222}
223
224fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
225 match col.data_type() {
226 &ArrowDataType::Binary => binary_to_target_array(
227 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
228 binary_to_f32,
229 ),
230 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
231 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
232 &ArrowDataType::Float32,
233 )),
234 dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
235 }
236}
237
238fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
239 match col.data_type() {
240 &ArrowDataType::Binary => binary_to_target_array(
241 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
242 signed_binary_to_target::<u64>,
243 ),
244 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
245 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
246 &ArrowDataType::UInt64,
247 )),
248 dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
249 }
250}
251
252fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
253 match col.data_type() {
254 &ArrowDataType::Binary => binary_to_target_array(
255 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
256 signed_binary_to_target::<u32>,
257 ),
258 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
259 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
260 &ArrowDataType::UInt32,
261 )),
262 dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
263 }
264}
265
266fn map_to_decimal128(col: &dyn Array) -> Result<Int128Array> {
267 match col.data_type() {
268 &ArrowDataType::Binary => binary_to_target_array(
269 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
270 signed_binary_to_target::<i128>,
271 ),
272 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
273 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
274 &ArrowDataType::Decimal(38, 0),
275 )),
276 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
277 }
278}
279
280fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
281 match col.data_type() {
282 &ArrowDataType::Binary => binary_to_target_array(
283 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
284 signed_binary_to_target::<i64>,
285 ),
286 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
287 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
288 &ArrowDataType::Int64,
289 )),
290 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
291 }
292}
293
294fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
295 match col.data_type() {
296 &ArrowDataType::Binary => binary_to_target_array(
297 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
298 signed_binary_to_target::<i32>,
299 ),
300 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
301 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
302 &ArrowDataType::Int32,
303 )),
304 dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
305 }
306}
307
308fn binary_to_target_array<T: NativeType>(
309 src: &BinaryArray<i32>,
310 convert: fn(&[u8]) -> Result<T>,
311) -> Result<PrimitiveArray<T>> {
312 let mut out = MutablePrimitiveArray::with_capacity(src.len());
313
314 for val in src.iter() {
315 out.push(val.map(convert).transpose()?);
316 }
317
318 Ok(out.into())
319}
320
321fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
322 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
323
324 big_num
325 .try_into()
326 .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
327}
328
329fn binary_to_f64(src: &[u8]) -> Result<f64> {
331 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
332
333 let x = f64::from(U256::try_from(big_num.abs()).unwrap());
334
335 if !big_num.is_negative() {
336 Ok(x)
337 } else {
338 Ok(-x)
339 }
340}
341
342fn binary_to_f32(src: &[u8]) -> Result<f32> {
344 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
345
346 let x = f32::from(U256::try_from(big_num.abs()).unwrap());
347
348 if !big_num.is_negative() {
349 Ok(x)
350 } else {
351 Ok(-x)
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_signed_binary_to_target() {
361 const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1, i64::MAX, i64::MIN];
362
363 for &input_num in RAW_INPUT {
364 let input = I256::try_from(input_num).unwrap();
365 let input_bytes = input.to_be_bytes::<32>();
366 let input_bytes = input_bytes.as_slice();
367 let output = signed_binary_to_target::<i64>(input_bytes).unwrap();
368 assert_eq!(i64::try_from(input).unwrap(), output);
369
370 let float_output = binary_to_f64(input_bytes).unwrap();
371 assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
372
373 let string_output = binary_to_int_str(input_bytes).unwrap();
374 assert_eq!(string_output, format!("{}", input_num));
375
376 let decimal_output = binary_to_decimal(input_bytes).unwrap();
377 assert_eq!(decimal_output.to_be_bytes(), input_bytes);
378 assert_eq!(format!("{}", decimal_output), format!("{}", input));
379 }
380 }
381}