1use std::{collections::BTreeMap, sync::Arc};
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use arrow::{
6 array::{
7 Array, ArrayRef, ArrowPrimitiveType, AsArray, BinaryArray, Decimal128Array,
8 Decimal256Array, Decimal256Builder, Float32Array, Float64Array, Int32Array, Int64Array,
9 PrimitiveArray, PrimitiveBuilder, RecordBatch, StringArray, StringBuilder, UInt32Array,
10 UInt64Array,
11 },
12 compute,
13 datatypes::{i256, DataType as ArrowDataType, Field, Schema},
14};
15use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
16use ruint::aliases::U256;
17use serde::{Deserialize, Serialize};
18
19#[derive(Default, Debug, Clone, Serialize, Deserialize)]
22pub struct ColumnMapping {
23 #[serde(default)]
25 pub block: BTreeMap<String, DataType>,
26 #[serde(default)]
28 pub transaction: BTreeMap<String, DataType>,
29 #[serde(default)]
31 pub log: BTreeMap<String, DataType>,
32 #[serde(default)]
34 pub trace: BTreeMap<String, DataType>,
35 #[serde(default)]
37 pub decoded_log: BTreeMap<String, DataType>,
38}
39
40#[allow(missing_docs)]
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum DataType {
46 Float64,
47 Float32,
48 UInt64,
49 UInt32,
50 Int64,
51 Int32,
52 IntStr,
53 Decimal256,
54 Decimal128,
55}
56
57impl From<DataType> for ArrowDataType {
58 fn from(value: DataType) -> Self {
59 match value {
60 DataType::Float64 => Self::Float64,
61 DataType::Float32 => Self::Float32,
62 DataType::UInt64 => Self::UInt64,
63 DataType::UInt32 => Self::UInt32,
64 DataType::Int64 => Self::Int64,
65 DataType::Int32 => Self::Int32,
66 DataType::IntStr => Self::Utf8,
67 DataType::Decimal256 => Self::Decimal256(76, 0),
68 DataType::Decimal128 => Self::Decimal128(38, 0),
69 }
70 }
71}
72
73pub fn apply_to_batch(
74 batch: &RecordBatch,
75 mapping: &BTreeMap<String, DataType>,
76) -> Result<RecordBatch> {
77 if mapping.is_empty() {
78 return Ok(batch.clone());
79 }
80
81 let (fields, cols) = batch
82 .columns()
83 .par_iter()
84 .zip(batch.schema().fields().par_iter())
85 .map(|(col, field)| {
86 let col = match mapping.get(field.name()) {
87 Some(&dt) => {
88 if field.name() == "l1_fee_scalar" {
89 map_l1_fee_scalar(&**col, dt)
90 .context(format!("apply cast to column '{}'", field.name()))?
91 } else {
92 map_column(&**col, dt)
93 .context(format!("apply cast to column '{}'", field.name()))?
94 }
95 }
96 None => col.clone(),
97 };
98
99 Ok((
100 Field::new(
101 field.name().clone(),
102 col.data_type().clone(),
103 field.is_nullable(),
104 ),
105 col,
106 ))
107 })
108 .collect::<Result<(Vec<_>, Vec<_>)>>()?;
109
110 let schema = Arc::new(Schema::new(fields));
111
112 Ok(RecordBatch::try_new(schema, cols).unwrap())
113}
114
115pub fn map_l1_fee_scalar(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
116 let col = col.as_any().downcast_ref::<BinaryArray>().unwrap();
117 let col = Float64Array::from_iter(
118 col.iter()
119 .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
120 );
121
122 let arrow_dt = ArrowDataType::from(target_data_type);
123
124 let arr = compute::cast(&col, &arrow_dt)
125 .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
126
127 Ok(arr)
128}
129
130fn to_array_ref<Arr: Array + 'static>(arr: Arr) -> ArrayRef {
131 Arc::new(arr)
132}
133
134fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
135 match target_data_type {
136 DataType::Float64 => map_to_f64(col).map(to_array_ref),
137 DataType::Float32 => map_to_f32(col).map(to_array_ref),
138 DataType::UInt64 => map_to_uint64(col).map(to_array_ref),
139 DataType::UInt32 => map_to_uint32(col).map(to_array_ref),
140 DataType::Int64 => map_to_int64(col).map(to_array_ref),
141 DataType::Int32 => map_to_int32(col).map(to_array_ref),
142 DataType::IntStr => map_to_int_str(col).map(to_array_ref),
143 DataType::Decimal256 => map_to_decimal(col).map(to_array_ref),
144 DataType::Decimal128 => map_to_decimal128(col).map(to_array_ref),
145 }
146}
147
148fn map_to_decimal(col: &dyn Array) -> Result<Decimal256Array> {
149 match col.data_type() {
150 &ArrowDataType::Binary => {
151 binary_to_decimal_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
152 }
153 dt => Err(anyhow!("Can't convert {:?} to decimal", dt)),
154 }
155}
156
157fn binary_to_decimal_array(arr: &BinaryArray) -> Result<Decimal256Array> {
158 let mut out = Decimal256Builder::with_capacity(arr.len());
159
160 for val in arr.iter() {
161 out.append_option(val.map(binary_to_decimal).transpose()?);
162 }
163
164 Ok(out.finish())
165}
166
167fn binary_to_decimal(binary: &[u8]) -> Result<i256> {
168 let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
169 let decimal = i256::from_be_bytes(big_num.to_be_bytes::<32>());
170
171 Ok(decimal)
172}
173
174fn map_to_int_str(col: &dyn Array) -> Result<StringArray> {
175 match col.data_type() {
176 &ArrowDataType::Binary => {
177 binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
178 }
179 dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
180 }
181}
182
183fn binary_to_int_str_array(arr: &BinaryArray) -> Result<StringArray> {
184 let mut out = StringBuilder::new();
185
186 for val in arr.iter() {
187 out.append_option(val.map(binary_to_int_str).transpose()?);
188 }
189
190 Ok(out.finish())
191}
192
193fn binary_to_int_str(binary: &[u8]) -> Result<String> {
194 let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
195 Ok(format!("{big_num}"))
196}
197
198fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
199 match col.data_type() {
200 &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f64),
201 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float64)
202 .context("arrow cast")?
203 .as_primitive()
204 .clone()),
205 dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
206 }
207}
208
209fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
210 match col.data_type() {
211 &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f32),
212 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float32)
213 .context("arrow cast")?
214 .as_primitive()
215 .clone()),
216 dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
217 }
218}
219
220fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
221 match col.data_type() {
222 &ArrowDataType::Binary => {
223 binary_to_target_array(col.as_binary(), signed_binary_to_target::<u64>)
224 }
225 &ArrowDataType::UInt64 => Ok(col.as_primitive().clone()),
226 dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
227 }
228}
229
230fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
231 match col.data_type() {
232 &ArrowDataType::Binary => {
233 binary_to_target_array(col.as_binary(), signed_binary_to_target::<u32>)
234 }
235 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::UInt32)
236 .context("arrow cast")?
237 .as_primitive()
238 .clone()),
239 dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
240 }
241}
242
243fn map_to_decimal128(col: &dyn Array) -> Result<Decimal128Array> {
244 match col.data_type() {
245 &ArrowDataType::Binary => {
246 binary_to_target_array(col.as_binary(), signed_binary_to_target::<i128>)
247 }
248 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Decimal128(38, 0))
249 .context("arrow cast")?
250 .as_primitive()
251 .clone()),
252 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
253 }
254}
255
256fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
257 match col.data_type() {
258 &ArrowDataType::Binary => {
259 binary_to_target_array(col.as_binary(), signed_binary_to_target::<i64>)
260 }
261 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int64)
262 .context("arrow cast")?
263 .as_primitive()
264 .clone()),
265 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
266 }
267}
268
269fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
270 match col.data_type() {
271 &ArrowDataType::Binary => {
272 binary_to_target_array(col.as_binary(), signed_binary_to_target::<i32>)
273 }
274 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int32)
275 .context("arrow cast")?
276 .as_primitive()
277 .clone()),
278 dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
279 }
280}
281
282fn binary_to_target_array<T: ArrowPrimitiveType>(
283 src: &BinaryArray,
284 convert: fn(&[u8]) -> Result<T::Native>,
285) -> Result<PrimitiveArray<T>> {
286 let mut out = PrimitiveBuilder::<T>::with_capacity(src.len());
287
288 for val in src.iter() {
289 out.append_option(val.map(convert).transpose()?);
290 }
291
292 Ok(out.finish())
293}
294
295fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
296 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
297
298 big_num
299 .try_into()
300 .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
301}
302
303fn binary_to_f64(src: &[u8]) -> Result<f64> {
305 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
306
307 let x = f64::from(U256::try_from(big_num.abs()).unwrap());
308
309 if !big_num.is_negative() {
310 Ok(x)
311 } else {
312 Ok(-x)
313 }
314}
315
316fn binary_to_f32(src: &[u8]) -> Result<f32> {
318 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
319
320 let x = f32::from(U256::try_from(big_num.abs()).unwrap());
321
322 if !big_num.is_negative() {
323 Ok(x)
324 } else {
325 Ok(-x)
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn test_signed_binary_to_target() {
335 const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1, i64::MAX, i64::MIN];
336
337 for &input_num in RAW_INPUT {
338 let input = I256::try_from(input_num).unwrap();
339 let input_bytes = input.to_be_bytes::<32>();
340 let input_bytes = input_bytes.as_slice();
341 let output = signed_binary_to_target::<i64>(input_bytes).unwrap();
342 assert_eq!(i64::try_from(input).unwrap(), output);
343
344 let float_output = binary_to_f64(input_bytes).unwrap();
345 assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
346
347 let string_output = binary_to_int_str(input_bytes).unwrap();
348 assert_eq!(string_output, format!("{}", input_num));
349
350 let decimal_output = binary_to_decimal(input_bytes).unwrap();
351 assert_eq!(decimal_output.to_be_bytes(), input_bytes);
352 assert_eq!(format!("{}", decimal_output), format!("{}", input));
353 }
354 }
355}