cherry_evm_decode/
lib.rs

1use std::sync::Arc;
2
3use alloy_dyn_abi::{DynSolCall, DynSolEvent, DynSolType, DynSolValue, Specifier};
4use alloy_primitives::{I256, U256};
5use anyhow::{anyhow, Context, Result};
6use arrow::{
7    array::{
8        builder, Array, ArrowPrimitiveType, BinaryArray, GenericBinaryArray, LargeBinaryArray,
9        ListArray, OffsetSizeTrait, RecordBatch, StructArray,
10    },
11    buffer::{NullBuffer, OffsetBuffer},
12    datatypes::{
13        DataType, Field, Fields, Int16Type, Int32Type, Int64Type, Int8Type, Schema, UInt16Type,
14        UInt32Type, UInt64Type, UInt8Type,
15    },
16};
17
18/// Returns topic0 based on given event signature
19pub fn signature_to_topic0(signature: &str) -> Result<[u8; 32]> {
20    let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
21    Ok(event.selector().into())
22}
23
24/// Decodes given call input data in arrow format to arrow format.
25/// Output Arrow schema is auto generated based on the function signature.
26/// Handles any level of nesting with Lists/Structs.
27///
28/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
29/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
30pub fn decode_call_inputs<I: OffsetSizeTrait>(
31    signature: &str,
32    data: &GenericBinaryArray<I>,
33    allow_decode_fail: bool,
34) -> Result<RecordBatch> {
35    decode_call_impl::<true, I>(signature, data, allow_decode_fail)
36}
37
38/// Decodes given call output data in arrow format to arrow format.
39/// Output Arrow schema is auto generated based on the function signature.
40/// Handles any level of nesting with Lists/Structs.
41///
42/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
43/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
44pub fn decode_call_outputs<I: OffsetSizeTrait>(
45    signature: &str,
46    data: &GenericBinaryArray<I>,
47    allow_decode_fail: bool,
48) -> Result<RecordBatch> {
49    decode_call_impl::<false, I>(signature, data, allow_decode_fail)
50}
51
52// IS_INPUT: true means we are decoding inputs
53// false means we are decoding outputs
54fn decode_call_impl<const IS_INPUT: bool, I: OffsetSizeTrait>(
55    signature: &str,
56    data: &GenericBinaryArray<I>,
57    allow_decode_fail: bool,
58) -> Result<RecordBatch> {
59    let (call, resolved) = resolve_function_signature(signature)?;
60
61    let schema = function_signature_to_arrow_schemas_impl(&call, &resolved)
62        .context("convert event signature to arrow schema")?;
63    let schema = if IS_INPUT { schema.0 } else { schema.1 };
64
65    let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(schema.fields().len());
66
67    let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(data.len());
68
69    for blob in data.iter() {
70        match blob {
71            Some(blob) => {
72                let decode_res = if IS_INPUT {
73                    resolved.abi_decode_input(blob)
74                } else {
75                    resolved.abi_decode_output(blob)
76                };
77                match decode_res {
78                    Ok(data) => decoded.push(Some(DynSolValue::Tuple(data))),
79                    Err(e) if allow_decode_fail => {
80                        log::debug!("failed to decode function call data: {}", e);
81                        decoded.push(None);
82                    }
83                    Err(e) => {
84                        return Err(anyhow!("failed to decode function call data: {}", e));
85                    }
86                }
87            }
88            None => decoded.push(None),
89        }
90    }
91
92    let sol_type = if IS_INPUT {
93        DynSolType::Tuple(resolved.types().to_vec())
94    } else {
95        DynSolType::Tuple(resolved.returns().types().to_vec())
96    };
97
98    let array = to_arrow(&sol_type, decoded, allow_decode_fail).context("map params to arrow")?;
99    match array.data_type() {
100        DataType::Struct(_) => {
101            let arr = array.as_any().downcast_ref::<StructArray>().unwrap();
102
103            for f in arr.columns().iter() {
104                arrays.push(f.clone());
105            }
106        }
107        _ => unreachable!(),
108    }
109
110    RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")
111}
112
113/// Returns (input schema, output schema)
114pub fn function_signature_to_arrow_schemas(signature: &str) -> Result<(Schema, Schema)> {
115    let (func, resolved) = resolve_function_signature(signature)?;
116    function_signature_to_arrow_schemas_impl(&func, &resolved)
117}
118
119fn function_signature_to_arrow_schemas_impl(
120    func: &alloy_json_abi::Function,
121    call: &DynSolCall,
122) -> Result<(Schema, Schema)> {
123    let mut input_fields = Vec::with_capacity(call.types().len());
124    let mut output_fields = Vec::with_capacity(call.returns().types().len());
125
126    for (i, (sol_t, param)) in call.types().iter().zip(func.inputs.iter()).enumerate() {
127        let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
128        let name = if param.name() == "" {
129            format!("param{}", i)
130        } else {
131            param.name().to_owned()
132        };
133        input_fields.push(Arc::new(Field::new(name, dtype, true)));
134    }
135
136    for (i, (sol_t, param)) in call
137        .returns()
138        .types()
139        .iter()
140        .zip(func.outputs.iter())
141        .enumerate()
142    {
143        let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
144        let name = if param.name() == "" {
145            format!("param{}", i)
146        } else {
147            param.name().to_owned()
148        };
149        output_fields.push(Arc::new(Field::new(name, dtype, true)));
150    }
151
152    Ok((Schema::new(input_fields), Schema::new(output_fields)))
153}
154
155fn resolve_function_signature(signature: &str) -> Result<(alloy_json_abi::Function, DynSolCall)> {
156    let event = alloy_json_abi::Function::parse(signature).context("parse function signature")?;
157    let resolved = event.resolve().context("resolve function signature")?;
158
159    Ok((event, resolved))
160}
161
162/// Decodes given event data in arrow format to arrow format.
163/// Output Arrow schema is auto generated based on the event signature.
164/// Handles any level of nesting with Lists/Structs.
165///
166/// Writes `null` for event data rows that fail to decode if `allow_decode_fail` is set to `true`.
167/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
168pub fn decode_events(
169    signature: &str,
170    data: &RecordBatch,
171    allow_decode_fail: bool,
172) -> Result<RecordBatch> {
173    let (event, resolved) = resolve_event_signature(signature)?;
174
175    let schema = event_signature_to_arrow_schema_impl(&event, &resolved)
176        .context("convert event signature to arrow schema")?;
177
178    let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(schema.fields().len());
179
180    for (sol_type, topic_name) in resolved
181        .indexed()
182        .iter()
183        .zip(["topic1", "topic2", "topic3"].iter())
184    {
185        let col = data
186            .column_by_name(topic_name)
187            .context("get topic column")?;
188
189        if col.data_type() == &DataType::Binary {
190            decode_topic(
191                sol_type,
192                col.as_any().downcast_ref::<BinaryArray>().unwrap(),
193                allow_decode_fail,
194                &mut arrays,
195            )
196            .context("decode topic")?;
197        } else if col.data_type() == &DataType::LargeBinary {
198            decode_topic(
199                sol_type,
200                col.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
201                allow_decode_fail,
202                &mut arrays,
203            )
204            .context("decode topic")?;
205        }
206    }
207
208    let body_col = data.column_by_name("data").context("get data column")?;
209
210    let body_sol_type = DynSolType::Tuple(resolved.body().to_vec());
211
212    if body_col.data_type() == &DataType::Binary {
213        decode_body(
214            &body_sol_type,
215            body_col.as_any().downcast_ref::<BinaryArray>().unwrap(),
216            allow_decode_fail,
217            &mut arrays,
218        )
219        .context("decode body")?;
220    } else if body_col.data_type() == &DataType::LargeBinary {
221        decode_body(
222            &body_sol_type,
223            body_col
224                .as_any()
225                .downcast_ref::<LargeBinaryArray>()
226                .unwrap(),
227            allow_decode_fail,
228            &mut arrays,
229        )
230        .context("decode body")?;
231    }
232
233    RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")
234}
235
236fn decode_body<I: OffsetSizeTrait>(
237    body_sol_type: &DynSolType,
238    body_col: &GenericBinaryArray<I>,
239    allow_decode_fail: bool,
240    arrays: &mut Vec<Arc<dyn Array>>,
241) -> Result<()> {
242    let mut body_decoded = Vec::<Option<DynSolValue>>::with_capacity(body_col.len());
243
244    for blob in body_col.iter() {
245        match blob {
246            Some(blob) => match body_sol_type.abi_decode_sequence(blob) {
247                Ok(data) => body_decoded.push(Some(data)),
248                Err(e) if allow_decode_fail => {
249                    log::debug!("failed to decode body: {}", e);
250                    body_decoded.push(None);
251                }
252                Err(e) => {
253                    return Err(anyhow!("failed to decode body: {}", e));
254                }
255            },
256            None => body_decoded.push(None),
257        }
258    }
259
260    let body_array =
261        to_arrow(body_sol_type, body_decoded, allow_decode_fail).context("map body to arrow")?;
262    match body_array.data_type() {
263        DataType::Struct(_) => {
264            let arr = body_array.as_any().downcast_ref::<StructArray>().unwrap();
265
266            for f in arr.columns().iter() {
267                arrays.push(f.clone());
268            }
269        }
270        _ => unreachable!(),
271    }
272
273    Ok(())
274}
275
276fn decode_topic<I: OffsetSizeTrait>(
277    sol_type: &DynSolType,
278    col: &GenericBinaryArray<I>,
279    allow_decode_fail: bool,
280    arrays: &mut Vec<Arc<dyn Array>>,
281) -> Result<()> {
282    let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(col.len());
283
284    for blob in col.iter() {
285        match blob {
286            Some(blob) => match sol_type.abi_decode(blob) {
287                Ok(data) => decoded.push(Some(data)),
288                Err(e) if allow_decode_fail => {
289                    log::debug!("failed to decode a topic: {}", e);
290                    decoded.push(None);
291                }
292                Err(e) => {
293                    return Err(anyhow!("failed to decode a topic: {}", e));
294                }
295            },
296            None => decoded.push(None),
297        }
298    }
299
300    arrays.push(to_arrow(sol_type, decoded, allow_decode_fail).context("map topic to arrow")?);
301
302    Ok(())
303}
304
305/// Generates Arrow schema based on given event signature
306pub fn event_signature_to_arrow_schema(signature: &str) -> Result<Schema> {
307    let (resolved, event) = resolve_event_signature(signature)?;
308    event_signature_to_arrow_schema_impl(&resolved, &event)
309}
310
311fn event_signature_to_arrow_schema_impl(
312    sig: &alloy_json_abi::Event,
313    event: &DynSolEvent,
314) -> Result<Schema> {
315    let num_fields = event.indexed().len() + event.body().len();
316    let mut fields = Vec::<Arc<Field>>::with_capacity(num_fields);
317    let mut names = Vec::with_capacity(num_fields);
318
319    for (i, input) in sig.inputs.iter().enumerate() {
320        if input.indexed {
321            let name = if input.name.is_empty() {
322                format!("param{}", i)
323            } else {
324                input.name.clone()
325            };
326            names.push(name);
327        }
328    }
329    for (i, input) in sig.inputs.iter().enumerate() {
330        if !input.indexed {
331            let name = if input.name.is_empty() {
332                format!("param{}", i)
333            } else {
334                input.name.clone()
335            };
336            names.push(name);
337        }
338    }
339
340    for (sol_t, name) in event.indexed().iter().chain(event.body()).zip(names) {
341        let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
342        fields.push(Arc::new(Field::new(name, dtype, true)));
343    }
344
345    Ok(Schema::new(fields))
346}
347
348fn resolve_event_signature(signature: &str) -> Result<(alloy_json_abi::Event, DynSolEvent)> {
349    let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
350    let resolved = event.resolve().context("resolve event signature")?;
351
352    Ok((event, resolved))
353}
354
355fn to_arrow_dtype(sol_type: &DynSolType) -> Result<DataType> {
356    match sol_type {
357        DynSolType::Bool => Ok(DataType::Boolean),
358        DynSolType::Bytes => Ok(DataType::Binary),
359        DynSolType::String => Ok(DataType::Utf8),
360        DynSolType::Address => Ok(DataType::Binary),
361        DynSolType::Int(num_bits) => Ok(num_bits_to_int_type(*num_bits)),
362        DynSolType::Uint(num_bits) => Ok(num_bits_to_uint_type(*num_bits)),
363        DynSolType::Array(inner_type) => {
364            let inner_type = to_arrow_dtype(inner_type).context("map inner")?;
365            Ok(DataType::List(Arc::new(Field::new("", inner_type, true))))
366        }
367        DynSolType::Function => Err(anyhow!(
368            "decoding 'Function' typed value in function signature isn't supported."
369        )),
370        DynSolType::FixedArray(inner_type, _) => {
371            let inner_type = to_arrow_dtype(inner_type).context("map inner")?;
372            Ok(DataType::List(Arc::new(Field::new("", inner_type, true))))
373        }
374        DynSolType::Tuple(fields) => {
375            let mut arrow_fields = Vec::<Arc<Field>>::with_capacity(fields.len());
376
377            for (i, f) in fields.iter().enumerate() {
378                let inner_dt = to_arrow_dtype(f).context("map field dt")?;
379                arrow_fields.push(Arc::new(Field::new(format!("param{}", i), inner_dt, true)));
380            }
381
382            Ok(DataType::Struct(Fields::from(arrow_fields)))
383        }
384        DynSolType::FixedBytes(_) => Ok(DataType::Binary),
385    }
386}
387
388fn num_bits_to_uint_type(num_bits: usize) -> DataType {
389    if num_bits <= 8 {
390        DataType::UInt8
391    } else if num_bits <= 16 {
392        DataType::UInt16
393    } else if num_bits <= 32 {
394        DataType::UInt32
395    } else if num_bits <= 64 {
396        DataType::UInt64
397    } else if num_bits <= 128 {
398        DataType::Decimal128(38, 0)
399    } else if num_bits <= 256 {
400        DataType::Decimal256(76, 0)
401    } else {
402        unreachable!()
403    }
404}
405
406fn num_bits_to_int_type(num_bits: usize) -> DataType {
407    if num_bits <= 8 {
408        DataType::Int8
409    } else if num_bits <= 16 {
410        DataType::Int16
411    } else if num_bits <= 32 {
412        DataType::Int32
413    } else if num_bits <= 64 {
414        DataType::Int64
415    } else if num_bits <= 128 {
416        DataType::Decimal128(38, 0)
417    } else if num_bits <= 256 {
418        DataType::Decimal256(76, 0)
419    } else {
420        unreachable!()
421    }
422}
423
424fn to_arrow(
425    sol_type: &DynSolType,
426    sol_values: Vec<Option<DynSolValue>>,
427    allow_decode_fail: bool,
428) -> Result<Arc<dyn Array>> {
429    match sol_type {
430        DynSolType::Bool => to_bool(&sol_values),
431        DynSolType::Bytes => to_binary(&sol_values),
432        DynSolType::String => to_string(&sol_values),
433        DynSolType::Address => to_binary(&sol_values),
434        DynSolType::Int(num_bits) => to_int(*num_bits, &sol_values, allow_decode_fail),
435        DynSolType::Uint(num_bits) => to_uint(*num_bits, &sol_values, allow_decode_fail),
436        DynSolType::Array(inner_type) => to_list(inner_type, sol_values, allow_decode_fail),
437        DynSolType::Function => Err(anyhow!(
438            "decoding 'Function' typed value in function signature isn't supported."
439        )),
440        DynSolType::FixedArray(inner_type, _) => to_list(inner_type, sol_values, allow_decode_fail),
441        DynSolType::Tuple(fields) => to_struct(fields, sol_values, allow_decode_fail),
442        DynSolType::FixedBytes(_) => to_binary(&sol_values),
443    }
444}
445
446fn to_int(
447    num_bits: usize,
448    sol_values: &[Option<DynSolValue>],
449    allow_decode_fail: bool,
450) -> Result<Arc<dyn Array>> {
451    match num_bits_to_int_type(num_bits) {
452        DataType::Int8 => to_int_impl::<Int8Type>(num_bits, sol_values),
453        DataType::Int16 => to_int_impl::<Int16Type>(num_bits, sol_values),
454        DataType::Int32 => to_int_impl::<Int32Type>(num_bits, sol_values),
455        DataType::Int64 => to_int_impl::<Int64Type>(num_bits, sol_values),
456        DataType::Decimal128(_, _) => to_decimal128(num_bits, sol_values),
457        DataType::Decimal256(_, _) => to_decimal256(num_bits, sol_values, allow_decode_fail),
458        _ => unreachable!(),
459    }
460}
461
462fn to_uint(
463    num_bits: usize,
464    sol_values: &[Option<DynSolValue>],
465    allow_decode_fail: bool,
466) -> Result<Arc<dyn Array>> {
467    match num_bits_to_int_type(num_bits) {
468        DataType::UInt8 => to_int_impl::<UInt8Type>(num_bits, sol_values),
469        DataType::UInt16 => to_int_impl::<UInt16Type>(num_bits, sol_values),
470        DataType::UInt32 => to_int_impl::<UInt32Type>(num_bits, sol_values),
471        DataType::UInt64 => to_int_impl::<UInt64Type>(num_bits, sol_values),
472        DataType::Decimal128(_, _) => to_decimal128(num_bits, sol_values),
473        DataType::Decimal256(_, _) => to_decimal256(num_bits, sol_values, allow_decode_fail),
474        _ => unreachable!(),
475    }
476}
477
478fn to_decimal128(num_bits: usize, sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
479    let mut builder = builder::Decimal128Builder::new();
480
481    for val in sol_values.iter() {
482        match val {
483            Some(val) => match val {
484                DynSolValue::Int(v, nb) => {
485                    assert_eq!(num_bits, *nb);
486
487                    let v = i128::try_from(*v).context("convert to i128")?;
488
489                    builder.append_value(v);
490                }
491                DynSolValue::Uint(v, nb) => {
492                    assert_eq!(num_bits, *nb);
493
494                    let v = i128::try_from(*v).context("convert to i128")?;
495
496                    builder.append_value(v);
497                }
498                _ => {
499                    return Err(anyhow!(
500                        "found unexpected value. Expected: bool, Found: {:?}",
501                        val
502                    ));
503                }
504            },
505            None => {
506                builder.append_null();
507            }
508        }
509    }
510
511    builder = builder.with_data_type(DataType::Decimal128(38, 0));
512
513    Ok(Arc::new(builder.finish()))
514}
515
516fn to_decimal256(
517    num_bits: usize,
518    sol_values: &[Option<DynSolValue>],
519    allow_decode_fail: bool,
520) -> Result<Arc<dyn Array>> {
521    let mut builder = builder::Decimal256Builder::new();
522
523    for val in sol_values.iter() {
524        match val {
525            Some(val) => match val {
526                DynSolValue::Int(v, nb) => {
527                    assert_eq!(num_bits, *nb);
528
529                    let v = arrow::datatypes::i256::from_be_bytes(v.to_be_bytes::<32>());
530
531                    builder.append_value(v);
532                }
533                DynSolValue::Uint(v, nb) => {
534                    assert_eq!(num_bits, *nb);
535                    match I256::try_from(*v).context("try u256 to i256") {
536                        Ok(v) => builder.append_value(arrow::datatypes::i256::from_be_bytes(
537                            v.to_be_bytes::<32>(),
538                        )),
539                        Err(e) => {
540                            if allow_decode_fail {
541                                log::debug!("failed to decode u256: {}", e);
542                                builder.append_null();
543                            } else {
544                                return Err(e);
545                            }
546                        }
547                    }
548                }
549                _ => {
550                    return Err(anyhow!(
551                        "found unexpected value. Expected: bool, Found: {:?}",
552                        val
553                    ));
554                }
555            },
556            None => {
557                builder.append_null();
558            }
559        }
560    }
561
562    builder = builder.with_data_type(DataType::Decimal256(76, 0));
563
564    Ok(Arc::new(builder.finish()))
565}
566
567fn to_int_impl<T>(num_bits: usize, sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>>
568where
569    T: ArrowPrimitiveType,
570    T::Native: TryFrom<I256> + TryFrom<U256>,
571{
572    let mut builder = builder::PrimitiveBuilder::<T>::new();
573
574    for val in sol_values.iter() {
575        match val {
576            Some(val) => match val {
577                DynSolValue::Int(v, nb) => {
578                    assert_eq!(num_bits, *nb);
579                    builder.append_value(match T::Native::try_from(*v) {
580                        Ok(v) => v,
581                        Err(_) => unreachable!(),
582                    });
583                }
584                DynSolValue::Uint(v, nb) => {
585                    assert_eq!(num_bits, *nb);
586                    builder.append_value(match T::Native::try_from(*v) {
587                        Ok(v) => v,
588                        Err(_) => unreachable!(),
589                    });
590                }
591                _ => {
592                    return Err(anyhow!(
593                        "found unexpected value. Expected: bool, Found: {:?}",
594                        val
595                    ));
596                }
597            },
598            None => {
599                builder.append_null();
600            }
601        }
602    }
603
604    Ok(Arc::new(builder.finish()))
605}
606
607fn to_list(
608    sol_type: &DynSolType,
609    sol_values: Vec<Option<DynSolValue>>,
610    allow_decode_fail: bool,
611) -> Result<Arc<dyn Array>> {
612    let mut lengths = Vec::with_capacity(sol_values.len());
613    let mut values = Vec::with_capacity(sol_values.len() * 2);
614    let mut validity = Vec::with_capacity(sol_values.len() * 2);
615
616    let mut all_valid = true;
617
618    for val in sol_values {
619        match val {
620            Some(val) => match val {
621                DynSolValue::Array(inner_vals) | DynSolValue::FixedArray(inner_vals) => {
622                    lengths.push(inner_vals.len());
623                    values.extend(inner_vals.into_iter().map(Some));
624                    validity.push(true);
625                }
626                _ => {
627                    return Err(anyhow!(
628                        "found unexpected value. Expected list type, Found: {:?}",
629                        val
630                    ));
631                }
632            },
633            None => {
634                lengths.push(0);
635                validity.push(false);
636                all_valid = false;
637            }
638        }
639    }
640
641    let values = to_arrow(sol_type, values, allow_decode_fail).context("map inner")?;
642    let field = Field::new(
643        "",
644        to_arrow_dtype(sol_type).context("construct data type")?,
645        true,
646    );
647    let list_arr = ListArray::try_new(
648        Arc::new(field),
649        OffsetBuffer::from_lengths(lengths),
650        values,
651        if all_valid {
652            None
653        } else {
654            Some(NullBuffer::from(validity))
655        },
656    )
657    .context("construct list array")?;
658    Ok(Arc::new(list_arr))
659}
660
661fn to_struct(
662    fields: &[DynSolType],
663    sol_values: Vec<Option<DynSolValue>>,
664    allow_decode_fail: bool,
665) -> Result<Arc<dyn Array>> {
666    let mut values = vec![Vec::with_capacity(sol_values.len()); fields.len()];
667
668    // unpack top layer of sol_values into columnar format
669    // since we recurse by calling to_arrow later in the function, this will eventually map to
670    // primitive types.
671    for val in sol_values.iter() {
672        match val {
673            Some(val) => match val {
674                DynSolValue::Tuple(inner_vals) => {
675                    if values.len() != inner_vals.len() {
676                        return Err(anyhow!(
677                            "found unexpected length tuple value. Expected: {}, Found: {}",
678                            values.len(),
679                            inner_vals.len()
680                        ));
681                    }
682                    for (v, inner) in values.iter_mut().zip(inner_vals) {
683                        v.push(Some(inner.clone()));
684                    }
685                }
686                _ => {
687                    return Err(anyhow!(
688                        "found unexpected value. Expected: tuple, Found: {:?}",
689                        val
690                    ));
691                }
692            },
693            None => {
694                for v in values.iter_mut() {
695                    v.push(None);
696                }
697            }
698        }
699    }
700
701    let mut arrays = Vec::with_capacity(fields.len());
702
703    for (sol_type, arr_vals) in fields.iter().zip(values.into_iter()) {
704        arrays.push(to_arrow(sol_type, arr_vals, allow_decode_fail)?);
705    }
706
707    let fields = arrays
708        .iter()
709        .enumerate()
710        .map(|(i, arr)| Field::new(format!("param{}", i), arr.data_type().clone(), true))
711        .collect::<Vec<_>>();
712    let schema = Arc::new(Schema::new(fields));
713
714    let batch = RecordBatch::try_new(schema, arrays).context("construct record batch")?;
715
716    Ok(Arc::new(StructArray::from(batch)))
717}
718
719fn to_bool(sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
720    let mut builder = builder::BooleanBuilder::new();
721
722    for val in sol_values.iter() {
723        match val {
724            Some(val) => match val {
725                DynSolValue::Bool(b) => {
726                    builder.append_value(*b);
727                }
728                _ => {
729                    return Err(anyhow!(
730                        "found unexpected value. Expected: bool, Found: {:?}",
731                        val
732                    ));
733                }
734            },
735            None => {
736                builder.append_null();
737            }
738        }
739    }
740
741    Ok(Arc::new(builder.finish()))
742}
743
744fn to_binary(sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
745    let mut builder = builder::BinaryBuilder::new();
746
747    for val in sol_values.iter() {
748        match val {
749            Some(val) => match val {
750                DynSolValue::Bytes(data) => {
751                    builder.append_value(data);
752                }
753                DynSolValue::FixedBytes(data, _) => {
754                    builder.append_value(data);
755                }
756                DynSolValue::Address(data) => {
757                    builder.append_value(data);
758                }
759                DynSolValue::Uint(v, _) => {
760                    builder.append_value(v.to_be_bytes::<32>());
761                }
762                DynSolValue::Int(v, _) => {
763                    builder.append_value(v.to_be_bytes::<32>());
764                }
765                _ => {
766                    return Err(anyhow!(
767                        "found unexpected value. Expected a binary type, Found: {:?}",
768                        val
769                    ));
770                }
771            },
772            None => {
773                builder.append_null();
774            }
775        }
776    }
777
778    Ok(Arc::new(builder.finish()))
779}
780
781fn to_string(sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
782    let mut builder = builder::StringBuilder::new();
783
784    for val in sol_values.iter() {
785        match val {
786            Some(val) => match val {
787                DynSolValue::String(s) => {
788                    builder.append_value(s);
789                }
790                _ => {
791                    return Err(anyhow!(
792                        "found unexpected value. Expected string, Found: {:?}",
793                        val
794                    ));
795                }
796            },
797            None => {
798                builder.append_null();
799            }
800        }
801    }
802
803    Ok(Arc::new(builder.finish()))
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809
810    #[test]
811    #[ignore]
812    fn nested_event_signature_to_schema() {
813        let sig = "ConfiguredQuests(address editor, uint256[][], address indexed my_addr, (bool,bool[],(bool, uint256[]))[] questDetails)";
814
815        let schema = event_signature_to_arrow_schema(sig).unwrap();
816
817        let expected_schema = Schema::new(vec![
818            Arc::new(Field::new("my_addr", DataType::Binary, true)),
819            Arc::new(Field::new("editor", DataType::Binary, true)),
820            Arc::new(Field::new(
821                "param1",
822                DataType::List(Arc::new(Field::new(
823                    "",
824                    DataType::List(Arc::new(Field::new("", DataType::Decimal256(76, 0), true))),
825                    true,
826                ))),
827                true,
828            )),
829            Arc::new(Field::new(
830                "questDetails",
831                DataType::List(Arc::new(Field::new(
832                    "",
833                    DataType::Struct(Fields::from(vec![
834                        Arc::new(Field::new("param0", DataType::Boolean, true)),
835                        Arc::new(Field::new(
836                            "param1",
837                            DataType::List(Arc::new(Field::new("", DataType::Boolean, true))),
838                            true,
839                        )),
840                        Arc::new(Field::new(
841                            "param2",
842                            DataType::Struct(Fields::from(vec![
843                                Arc::new(Field::new("param0", DataType::Boolean, true)),
844                                Arc::new(Field::new(
845                                    "param1",
846                                    DataType::List(Arc::new(Field::new(
847                                        "",
848                                        DataType::Decimal256(76, 0),
849                                        true,
850                                    ))),
851                                    true,
852                                )),
853                            ])),
854                            true,
855                        )),
856                    ])),
857                    true,
858                ))),
859                true,
860            )),
861        ]);
862
863        assert_eq!(schema, expected_schema);
864    }
865
866    #[test]
867    #[ignore]
868    fn i256_to_arrow_i256() {
869        for val in [
870            I256::MIN,
871            I256::MAX,
872            I256::MAX / I256::try_from(2i32).unwrap(),
873        ] {
874            let out = arrow::datatypes::i256::from_be_bytes(val.to_be_bytes::<32>());
875
876            assert_eq!(val.to_string(), out.to_string());
877        }
878    }
879
880    #[test]
881    #[ignore]
882    fn read_parquet_with_real_data() {
883        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
884        use std::fs::File;
885        let builder =
886            ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
887        let mut reader = builder.build().unwrap();
888        let logs = reader.next().unwrap().unwrap();
889
890        let signature =
891            "PairCreated(address indexed token0, address indexed token1, address pair,uint256)";
892
893        let decoded = decode_events(signature, &logs, false).unwrap();
894
895        // Save the filtered instructions to a new parquet file
896        let mut file = File::create("decoded_logs.parquet").unwrap();
897        let mut writer =
898            parquet::arrow::ArrowWriter::try_new(&mut file, decoded.schema(), None).unwrap();
899        writer.write(&decoded).unwrap();
900        writer.close().unwrap();
901    }
902}