1use std::{collections::BTreeMap, sync::Arc};
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use arrow::{
6 array::{
7 Array, ArrayRef, ArrowPrimitiveType, AsArray, BinaryArray, Decimal128Array,
8 Decimal256Array, Decimal256Builder, Float32Array, Float64Array, Int32Array, Int64Array,
9 PrimitiveArray, PrimitiveBuilder, RecordBatch, StringArray, StringBuilder, UInt32Array,
10 UInt64Array,
11 },
12 compute,
13 datatypes::{i256, DataType as ArrowDataType, Field, Schema},
14};
15use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
16use ruint::aliases::U256;
17use schemars::JsonSchema;
18use serde::{Deserialize, Serialize};
19
20#[derive(Default, Debug, Clone, Serialize, Deserialize, JsonSchema)]
23pub struct ColumnMapping {
24 #[serde(default)]
26 pub block: BTreeMap<String, DataType>,
27 #[serde(default)]
29 pub transaction: BTreeMap<String, DataType>,
30 #[serde(default)]
32 pub log: BTreeMap<String, DataType>,
33 #[serde(default)]
35 pub trace: BTreeMap<String, DataType>,
36 #[serde(default)]
38 pub decoded_log: BTreeMap<String, DataType>,
39}
40
41#[allow(missing_docs)]
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
45#[serde(rename_all = "lowercase")]
46pub enum DataType {
47 Float64,
48 Float32,
49 UInt64,
50 UInt32,
51 Int64,
52 Int32,
53 IntStr,
54 Decimal256,
55 Decimal128,
56}
57
58impl From<DataType> for ArrowDataType {
59 fn from(value: DataType) -> Self {
60 match value {
61 DataType::Float64 => Self::Float64,
62 DataType::Float32 => Self::Float32,
63 DataType::UInt64 => Self::UInt64,
64 DataType::UInt32 => Self::UInt32,
65 DataType::Int64 => Self::Int64,
66 DataType::Int32 => Self::Int32,
67 DataType::IntStr => Self::Utf8,
68 DataType::Decimal256 => Self::Decimal256(76, 0),
69 DataType::Decimal128 => Self::Decimal128(38, 0),
70 }
71 }
72}
73
74pub fn apply_to_batch(
75 batch: &RecordBatch,
76 mapping: &BTreeMap<String, DataType>,
77) -> Result<RecordBatch> {
78 if mapping.is_empty() {
79 return Ok(batch.clone());
80 }
81
82 let (fields, cols) = batch
83 .columns()
84 .par_iter()
85 .zip(batch.schema().fields().par_iter())
86 .map(|(col, field)| {
87 let col = match mapping.get(field.name()) {
88 Some(&dt) => {
89 if field.name() == "l1_fee_scalar" {
90 map_l1_fee_scalar(&**col, dt)
91 .context(format!("apply cast to column '{}'", field.name()))?
92 } else {
93 map_column(&**col, dt)
94 .context(format!("apply cast to column '{}'", field.name()))?
95 }
96 }
97 None => col.clone(),
98 };
99
100 Ok((
101 Field::new(
102 field.name().clone(),
103 col.data_type().clone(),
104 field.is_nullable(),
105 ),
106 col,
107 ))
108 })
109 .collect::<Result<(Vec<_>, Vec<_>)>>()?;
110
111 let schema = Arc::new(Schema::new(fields));
112
113 Ok(RecordBatch::try_new(schema, cols).unwrap())
114}
115
116pub fn map_l1_fee_scalar(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
117 let col = col.as_any().downcast_ref::<BinaryArray>().unwrap();
118 let col = Float64Array::from_iter(
119 col.iter()
120 .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
121 );
122
123 let arrow_dt = ArrowDataType::from(target_data_type);
124
125 let arr = compute::cast(&col, &arrow_dt)
126 .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
127
128 Ok(arr)
129}
130
131fn to_array_ref<Arr: Array + 'static>(arr: Arr) -> ArrayRef {
132 Arc::new(arr)
133}
134
135fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
136 match target_data_type {
137 DataType::Float64 => map_to_f64(col).map(to_array_ref),
138 DataType::Float32 => map_to_f32(col).map(to_array_ref),
139 DataType::UInt64 => map_to_uint64(col).map(to_array_ref),
140 DataType::UInt32 => map_to_uint32(col).map(to_array_ref),
141 DataType::Int64 => map_to_int64(col).map(to_array_ref),
142 DataType::Int32 => map_to_int32(col).map(to_array_ref),
143 DataType::IntStr => map_to_int_str(col).map(to_array_ref),
144 DataType::Decimal256 => map_to_decimal(col).map(to_array_ref),
145 DataType::Decimal128 => map_to_decimal128(col).map(to_array_ref),
146 }
147}
148
149fn map_to_decimal(col: &dyn Array) -> Result<Decimal256Array> {
150 match col.data_type() {
151 &ArrowDataType::Binary => {
152 binary_to_decimal_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
153 }
154 dt => Err(anyhow!("Can't convert {:?} to decimal", dt)),
155 }
156}
157
158fn binary_to_decimal_array(arr: &BinaryArray) -> Result<Decimal256Array> {
159 let mut out = Decimal256Builder::with_capacity(arr.len());
160
161 for val in arr.iter() {
162 out.append_option(val.map(binary_to_decimal).transpose()?);
163 }
164
165 Ok(out.finish())
166}
167
168fn binary_to_decimal(binary: &[u8]) -> Result<i256> {
169 let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
170 let decimal = i256::from_be_bytes(big_num.to_be_bytes::<32>());
171
172 Ok(decimal)
173}
174
175fn map_to_int_str(col: &dyn Array) -> Result<StringArray> {
176 match col.data_type() {
177 &ArrowDataType::Binary => {
178 binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray>().unwrap())
179 }
180 dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
181 }
182}
183
184fn binary_to_int_str_array(arr: &BinaryArray) -> Result<StringArray> {
185 let mut out = StringBuilder::new();
186
187 for val in arr.iter() {
188 out.append_option(val.map(binary_to_int_str).transpose()?);
189 }
190
191 Ok(out.finish())
192}
193
194fn binary_to_int_str(binary: &[u8]) -> Result<String> {
195 let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
196 Ok(format!("{big_num}"))
197}
198
199fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
200 match col.data_type() {
201 &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f64),
202 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float64)
203 .context("arrow cast")?
204 .as_primitive()
205 .clone()),
206 dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
207 }
208}
209
210fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
211 match col.data_type() {
212 &ArrowDataType::Binary => binary_to_target_array(col.as_binary(), binary_to_f32),
213 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Float32)
214 .context("arrow cast")?
215 .as_primitive()
216 .clone()),
217 dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
218 }
219}
220
221fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
222 match col.data_type() {
223 &ArrowDataType::Binary => {
224 binary_to_target_array(col.as_binary(), signed_binary_to_target::<u64>)
225 }
226 &ArrowDataType::UInt64 => Ok(col.as_primitive().clone()),
227 dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
228 }
229}
230
231fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
232 match col.data_type() {
233 &ArrowDataType::Binary => {
234 binary_to_target_array(col.as_binary(), signed_binary_to_target::<u32>)
235 }
236 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::UInt32)
237 .context("arrow cast")?
238 .as_primitive()
239 .clone()),
240 dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
241 }
242}
243
244fn map_to_decimal128(col: &dyn Array) -> Result<Decimal128Array> {
245 match col.data_type() {
246 &ArrowDataType::Binary => {
247 binary_to_target_array(col.as_binary(), signed_binary_to_target::<i128>)
248 }
249 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Decimal128(38, 0))
250 .context("arrow cast")?
251 .as_primitive()
252 .clone()),
253 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
254 }
255}
256
257fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
258 match col.data_type() {
259 &ArrowDataType::Binary => {
260 binary_to_target_array(col.as_binary(), signed_binary_to_target::<i64>)
261 }
262 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int64)
263 .context("arrow cast")?
264 .as_primitive()
265 .clone()),
266 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
267 }
268}
269
270fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
271 match col.data_type() {
272 &ArrowDataType::Binary => {
273 binary_to_target_array(col.as_binary(), signed_binary_to_target::<i32>)
274 }
275 &ArrowDataType::UInt64 => Ok(compute::cast(col, &ArrowDataType::Int32)
276 .context("arrow cast")?
277 .as_primitive()
278 .clone()),
279 dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
280 }
281}
282
283fn binary_to_target_array<T: ArrowPrimitiveType>(
284 src: &BinaryArray,
285 convert: fn(&[u8]) -> Result<T::Native>,
286) -> Result<PrimitiveArray<T>> {
287 let mut out = PrimitiveBuilder::<T>::with_capacity(src.len());
288
289 for val in src.iter() {
290 out.append_option(val.map(convert).transpose()?);
291 }
292
293 Ok(out.finish())
294}
295
296fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
297 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
298
299 big_num
300 .try_into()
301 .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
302}
303
304fn binary_to_f64(src: &[u8]) -> Result<f64> {
306 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
307
308 let x = f64::from(U256::try_from(big_num.abs()).unwrap());
309
310 if !big_num.is_negative() {
311 Ok(x)
312 } else {
313 Ok(-x)
314 }
315}
316
317fn binary_to_f32(src: &[u8]) -> Result<f32> {
319 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
320
321 let x = f32::from(U256::try_from(big_num.abs()).unwrap());
322
323 if !big_num.is_negative() {
324 Ok(x)
325 } else {
326 Ok(-x)
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_signed_binary_to_target() {
336 const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1, i64::MAX, i64::MIN];
337
338 for &input_num in RAW_INPUT {
339 let input = I256::try_from(input_num).unwrap();
340 let input_bytes = input.to_be_bytes::<32>();
341 let input_bytes = input_bytes.as_slice();
342 let output = signed_binary_to_target::<i64>(input_bytes).unwrap();
343 assert_eq!(i64::try_from(input).unwrap(), output);
344
345 let float_output = binary_to_f64(input_bytes).unwrap();
346 assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
347
348 let string_output = binary_to_int_str(input_bytes).unwrap();
349 assert_eq!(string_output, format!("{}", input_num));
350
351 let decimal_output = binary_to_decimal(input_bytes).unwrap();
352 assert_eq!(decimal_output.to_be_bytes(), input_bytes);
353 assert_eq!(format!("{}", decimal_output), format!("{}", input));
354 }
355 }
356}