1use std::collections::BTreeMap;
2
3use alloy_primitives::I256;
4use anyhow::{anyhow, Context, Result};
5use hypersync_schema::ArrowChunk;
6use polars_arrow::array::{
7 Array, BinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, MutablePrimitiveArray,
8 MutableUtf8Array, PrimitiveArray, UInt32Array, UInt64Array, Utf8Array,
9};
10use polars_arrow::compute::cast::CastOptionsImpl as CastOptions;
11use polars_arrow::compute::{self, cast};
12use polars_arrow::datatypes::{ArrowDataType, ArrowSchema as Schema, Field};
13use polars_arrow::types::NativeType;
14use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
15use ruint::aliases::U256;
16use serde::{Deserialize, Serialize};
17
18use crate::ArrowBatch;
19
20#[derive(Default, Debug, Clone, Serialize, Deserialize)]
23pub struct ColumnMapping {
24 #[serde(default)]
26 pub block: BTreeMap<String, DataType>,
27 #[serde(default)]
29 pub transaction: BTreeMap<String, DataType>,
30 #[serde(default)]
32 pub log: BTreeMap<String, DataType>,
33 #[serde(default)]
35 pub trace: BTreeMap<String, DataType>,
36 #[serde(default)]
38 pub decoded_log: BTreeMap<String, DataType>,
39}
40
41#[allow(missing_docs)]
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "lowercase")]
46pub enum DataType {
47 Float64,
48 Float32,
49 UInt64,
50 UInt32,
51 Int64,
52 Int32,
53 IntStr,
54}
55
56impl From<DataType> for ArrowDataType {
57 fn from(value: DataType) -> Self {
58 match value {
59 DataType::Float64 => Self::Float64,
60 DataType::Float32 => Self::Float32,
61 DataType::UInt64 => Self::UInt64,
62 DataType::UInt32 => Self::UInt32,
63 DataType::Int64 => Self::Int64,
64 DataType::Int32 => Self::Int32,
65 DataType::IntStr => Self::Utf8,
66 }
67 }
68}
69
70pub fn apply_to_batch(
71 batch: &ArrowBatch,
72 mapping: &BTreeMap<String, DataType>,
73) -> Result<ArrowBatch> {
74 if mapping.is_empty() {
75 return Ok(batch.clone());
76 }
77
78 let (fields, cols) = batch
79 .chunk
80 .columns()
81 .par_iter()
82 .zip(batch.schema.fields.par_iter())
83 .map(|(col, field)| {
84 let col = match mapping.get(&field.name) {
85 Some(&dt) => {
86 if field.name == "l1_fee_scalar" {
87 map_l1_fee_scalar(&**col, dt)
88 .context(format!("apply cast to column '{}'", field.name))?
89 } else {
90 map_column(&**col, dt)
91 .context(format!("apply cast to colum '{}'", field.name))?
92 }
93 }
94 None => col.clone(),
95 };
96
97 Ok((
98 Field::new(
99 field.name.clone(),
100 col.data_type().clone(),
101 field.is_nullable,
102 ),
103 col,
104 ))
105 })
106 .collect::<Result<(Vec<_>, Vec<_>)>>()?;
107
108 Ok(ArrowBatch {
109 chunk: ArrowChunk::new(cols).into(),
110 schema: Schema::from(fields).into(),
111 })
112}
113
114pub fn map_l1_fee_scalar(
115 col: &dyn Array,
116 target_data_type: DataType,
117) -> Result<Box<dyn Array + 'static>> {
118 let col = col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
119 let col = Float64Array::from_iter(
120 col.iter()
121 .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
122 );
123
124 let arr = compute::cast::cast(
125 &*to_box(col),
126 &target_data_type.into(),
127 CastOptions {
128 wrapped: false,
129 partial: false,
130 },
131 )
132 .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
133
134 Ok(arr)
135}
136
137fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
138 Box::new(arr)
139}
140
141fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
142 match target_data_type {
143 DataType::Float64 => map_to_f64(col).map(to_box),
144 DataType::Float32 => map_to_f32(col).map(to_box),
145 DataType::UInt64 => map_to_uint64(col).map(to_box),
146 DataType::UInt32 => map_to_uint32(col).map(to_box),
147 DataType::Int64 => map_to_int64(col).map(to_box),
148 DataType::Int32 => map_to_int32(col).map(to_box),
149 DataType::IntStr => map_to_int_str(col).map(to_box),
150 }
151}
152
153fn map_to_int_str(col: &dyn Array) -> Result<Utf8Array<i32>> {
154 match col.data_type() {
155 &ArrowDataType::Binary => {
156 binary_to_int_str_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
157 }
158 dt => Err(anyhow!("Can't convert {:?} to intstr", dt)),
159 }
160}
161
162fn binary_to_int_str_array(arr: &BinaryArray<i32>) -> Result<Utf8Array<i32>> {
163 let mut out = MutableUtf8Array::with_capacity(arr.len());
164
165 for val in arr.iter() {
166 out.push(val.map(binary_to_int_str).transpose()?);
167 }
168
169 Ok(out.into())
170}
171
172fn binary_to_int_str(binary: &[u8]) -> Result<String> {
173 let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
174 Ok(format!("{}", big_num))
175}
176
177fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
178 match col.data_type() {
179 &ArrowDataType::Binary => binary_to_target_array(
180 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
181 binary_to_f64,
182 ),
183 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
184 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
185 &ArrowDataType::Float64,
186 )),
187 dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
188 }
189}
190
191fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
192 match col.data_type() {
193 &ArrowDataType::Binary => binary_to_target_array(
194 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
195 binary_to_f32,
196 ),
197 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
198 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
199 &ArrowDataType::Float32,
200 )),
201 dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
202 }
203}
204
205fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
206 match col.data_type() {
207 &ArrowDataType::Binary => binary_to_target_array(
208 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
209 signed_binary_to_target::<u64>,
210 ),
211 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
212 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
213 &ArrowDataType::UInt64,
214 )),
215 dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
216 }
217}
218
219fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
220 match col.data_type() {
221 &ArrowDataType::Binary => binary_to_target_array(
222 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
223 signed_binary_to_target::<u32>,
224 ),
225 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
226 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
227 &ArrowDataType::UInt32,
228 )),
229 dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
230 }
231}
232
233fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
234 match col.data_type() {
235 &ArrowDataType::Binary => binary_to_target_array(
236 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
237 signed_binary_to_target::<i64>,
238 ),
239 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
240 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
241 &ArrowDataType::Int64,
242 )),
243 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
244 }
245}
246
247fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
248 match col.data_type() {
249 &ArrowDataType::Binary => binary_to_target_array(
250 col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap(),
251 signed_binary_to_target::<i32>,
252 ),
253 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
254 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
255 &ArrowDataType::Int32,
256 )),
257 dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
258 }
259}
260
261fn binary_to_target_array<T: NativeType>(
262 src: &BinaryArray<i32>,
263 convert: fn(&[u8]) -> Result<T>,
264) -> Result<PrimitiveArray<T>> {
265 let mut out = MutablePrimitiveArray::with_capacity(src.len());
266
267 for val in src.iter() {
268 out.push(val.map(convert).transpose()?);
269 }
270
271 Ok(out.into())
272}
273
274fn signed_binary_to_target<T: TryFrom<I256>>(src: &[u8]) -> Result<T> {
275 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
276
277 big_num
278 .try_into()
279 .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
280}
281
282fn binary_to_f64(src: &[u8]) -> Result<f64> {
284 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
285
286 let x = f64::from(U256::try_from(big_num.abs()).unwrap());
287
288 if !big_num.is_negative() {
289 Ok(x)
290 } else {
291 Ok(-x)
292 }
293}
294
295fn binary_to_f32(src: &[u8]) -> Result<f32> {
297 let big_num = I256::try_from_be_slice(src).context("failed to parse number into I256")?;
298
299 let x = f32::from(U256::try_from(big_num.abs()).unwrap());
300
301 if !big_num.is_negative() {
302 Ok(x)
303 } else {
304 Ok(-x)
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311
312 #[test]
313 fn test_signed_binary_to_target() {
314 const RAW_INPUT: &[i64] = &[-69, 0, 69, -1, 1];
315
316 for &input_num in RAW_INPUT {
317 let input = I256::try_from(input_num).unwrap();
318 let output =
319 signed_binary_to_target::<i64>(input.to_be_bytes::<32>().as_slice()).unwrap();
320 assert_eq!(i64::try_from(input).unwrap(), output);
321
322 let float_output = binary_to_f64(input.to_be_bytes::<32>().as_slice()).unwrap();
323 assert_eq!(I256::try_from(float_output as i64).unwrap(), input);
324
325 let string_output = binary_to_int_str(input.to_be_bytes::<32>().as_slice()).unwrap();
326 assert_eq!(string_output, format!("{}", input_num));
327 }
328 }
329}