1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Context, Result};
4use hyperfuel_schema::ArrowChunk;
5use polars_arrow::array::{
6 Array, BinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, MutablePrimitiveArray,
7 PrimitiveArray, UInt32Array, UInt64Array,
8};
9use polars_arrow::compute::cast::CastOptionsImpl as CastOptions;
10use polars_arrow::compute::{self, cast};
11use polars_arrow::datatypes::{ArrowDataType, ArrowSchema as Schema, Field};
12use polars_arrow::types::NativeType;
13use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
14use ruint::aliases::U256;
15use serde::{Deserialize, Serialize};
16
17use crate::ArrowBatch;
18
19#[derive(Default, Debug, Clone, Serialize, Deserialize)]
22pub struct ColumnMapping {
23 #[serde(default)]
25 pub block: BTreeMap<String, DataType>,
26 #[serde(default)]
28 pub transaction: BTreeMap<String, DataType>,
29 #[serde(default)]
31 pub receipt: BTreeMap<String, DataType>,
32 #[serde(default)]
34 pub input: BTreeMap<String, DataType>,
35 #[serde(default)]
37 pub output: BTreeMap<String, DataType>,
38}
39
40#[allow(missing_docs)]
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum DataType {
46 Float64,
47 Float32,
48 UInt64,
49 UInt32,
50 Int64,
51 Int32,
52}
53
54impl From<DataType> for ArrowDataType {
55 fn from(value: DataType) -> Self {
56 match value {
57 DataType::Float64 => Self::Float64,
58 DataType::Float32 => Self::Float32,
59 DataType::UInt64 => Self::UInt64,
60 DataType::UInt32 => Self::UInt32,
61 DataType::Int64 => Self::Int64,
62 DataType::Int32 => Self::Int32,
63 }
64 }
65}
66
67pub fn apply_to_batch(
68 batch: &ArrowBatch,
69 mapping: &BTreeMap<String, DataType>,
70) -> Result<ArrowBatch> {
71 if mapping.is_empty() {
72 return Ok(batch.clone());
73 }
74
75 let (fields, cols) = batch
76 .chunk
77 .columns()
78 .par_iter()
79 .zip(batch.schema.fields.par_iter())
80 .map(|(col, field)| {
81 let col = match mapping.get(&field.name) {
82 Some(&dt) => {
83 if field.name == "l1_fee_scalar" {
84 map_l1_fee_scalar(&**col, dt)
85 .context(format!("apply cast to column '{}'", field.name))?
86 } else {
87 map_column(&**col, dt)
88 .context(format!("apply cast to colum '{}'", field.name))?
89 }
90 }
91 None => col.clone(),
92 };
93
94 Ok((
95 Field::new(
96 field.name.clone(),
97 col.data_type().clone(),
98 field.is_nullable,
99 ),
100 col,
101 ))
102 })
103 .collect::<Result<(Vec<_>, Vec<_>)>>()?;
104
105 Ok(ArrowBatch {
106 chunk: ArrowChunk::new(cols).into(),
107 schema: Schema::from(fields).into(),
108 })
109}
110
111pub fn map_l1_fee_scalar(
112 col: &dyn Array,
113 target_data_type: DataType,
114) -> Result<Box<dyn Array + 'static>> {
115 let col = col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
116 let col = Float64Array::from_iter(
117 col.iter()
118 .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
119 );
120
121 let arr = compute::cast::cast(
122 &*to_box(col),
123 &target_data_type.into(),
124 CastOptions {
125 wrapped: false,
126 partial: false,
127 },
128 )
129 .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
130
131 Ok(arr)
132}
133
134fn to_box<T: Array>(arr: T) -> Box<dyn Array> {
135 Box::new(arr)
136}
137
138fn map_column(col: &dyn Array, target_data_type: DataType) -> Result<Box<dyn Array + 'static>> {
139 match target_data_type {
140 DataType::Float64 => map_to_f64(col).map(to_box),
141 DataType::Float32 => map_to_f32(col).map(to_box),
142 DataType::UInt64 => map_to_uint64(col).map(to_box),
143 DataType::UInt32 => map_to_uint32(col).map(to_box),
144 DataType::Int64 => map_to_int64(col).map(to_box),
145 DataType::Int32 => map_to_int32(col).map(to_box),
146 }
147}
148
149fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {
150 match col.data_type() {
151 &ArrowDataType::Binary => {
152 binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
153 }
154 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
155 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
156 &ArrowDataType::Float64,
157 )),
158 dt => Err(anyhow!("Can't convert {:?} to f64", dt)),
159 }
160}
161
162fn map_to_f32(col: &dyn Array) -> Result<Float32Array> {
163 match col.data_type() {
164 &ArrowDataType::Binary => {
165 binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
166 }
167 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
168 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
169 &ArrowDataType::Float32,
170 )),
171 dt => Err(anyhow!("Can't convert {:?} to f32", dt)),
172 }
173}
174
175fn map_to_uint64(col: &dyn Array) -> Result<UInt64Array> {
176 match col.data_type() {
177 &ArrowDataType::Binary => {
178 binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
179 }
180 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
181 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
182 &ArrowDataType::UInt64,
183 )),
184 dt => Err(anyhow!("Can't convert {:?} to uint64", dt)),
185 }
186}
187
188fn map_to_uint32(col: &dyn Array) -> Result<UInt32Array> {
189 match col.data_type() {
190 &ArrowDataType::Binary => {
191 binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
192 }
193 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
194 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
195 &ArrowDataType::UInt32,
196 )),
197 dt => Err(anyhow!("Can't convert {:?} to uint32", dt)),
198 }
199}
200
201fn map_to_int64(col: &dyn Array) -> Result<Int64Array> {
202 match col.data_type() {
203 &ArrowDataType::Binary => {
204 binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
205 }
206 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
207 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
208 &ArrowDataType::Int64,
209 )),
210 dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
211 }
212}
213
214fn map_to_int32(col: &dyn Array) -> Result<Int32Array> {
215 match col.data_type() {
216 &ArrowDataType::Binary => {
217 binary_to_target_array(col.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
218 }
219 &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive(
220 col.as_any().downcast_ref::<UInt64Array>().unwrap(),
221 &ArrowDataType::Int32,
222 )),
223 dt => Err(anyhow!("Can't convert {:?} to int32", dt)),
224 }
225}
226
227fn binary_to_target_array<T: NativeType + TryFrom<U256>>(
228 src: &BinaryArray<i32>,
229) -> Result<PrimitiveArray<T>> {
230 let mut out = MutablePrimitiveArray::with_capacity(src.len());
231
232 for val in src.iter() {
233 out.push(val.map(binary_to_target).transpose()?);
234 }
235
236 Ok(out.into())
237}
238
239fn binary_to_target<T: TryFrom<U256>>(src: &[u8]) -> Result<T> {
240 let big_num = U256::from_be_slice(src);
241 big_num
242 .try_into()
243 .map_err(|_e| anyhow!("failed to cast number to requested type"))
244}