1use std::sync::Arc;
2
3use alloy_dyn_abi::{DynSolCall, DynSolEvent, DynSolType, DynSolValue, Specifier};
4use alloy_primitives::{I256, U256};
5use anyhow::{anyhow, Context, Result};
6use arrow::{
7 array::{
8 builder, Array, ArrowPrimitiveType, BinaryArray, GenericBinaryArray, LargeBinaryArray,
9 ListArray, OffsetSizeTrait, RecordBatch, StructArray,
10 },
11 buffer::{NullBuffer, OffsetBuffer},
12 datatypes::{
13 DataType, Field, Fields, Int16Type, Int32Type, Int64Type, Int8Type, Schema, UInt16Type,
14 UInt32Type, UInt64Type, UInt8Type,
15 },
16};
17
18pub fn signature_to_topic0(signature: &str) -> Result<[u8; 32]> {
20 let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
21 Ok(event.selector().into())
22}
23
24pub fn decode_call_inputs<I: OffsetSizeTrait>(
31 signature: &str,
32 data: &GenericBinaryArray<I>,
33 allow_decode_fail: bool,
34) -> Result<RecordBatch> {
35 decode_call_impl::<true, I>(signature, data, allow_decode_fail)
36}
37
38pub fn decode_call_outputs<I: OffsetSizeTrait>(
45 signature: &str,
46 data: &GenericBinaryArray<I>,
47 allow_decode_fail: bool,
48) -> Result<RecordBatch> {
49 decode_call_impl::<false, I>(signature, data, allow_decode_fail)
50}
51
52fn decode_call_impl<const IS_INPUT: bool, I: OffsetSizeTrait>(
55 signature: &str,
56 data: &GenericBinaryArray<I>,
57 allow_decode_fail: bool,
58) -> Result<RecordBatch> {
59 let (call, resolved) = resolve_function_signature(signature)?;
60
61 let schema = function_signature_to_arrow_schemas_impl(&call, &resolved)
62 .context("convert event signature to arrow schema")?;
63 let schema = if IS_INPUT { schema.0 } else { schema.1 };
64
65 let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(schema.fields().len());
66
67 let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(data.len());
68
69 for blob in data.iter() {
70 match blob {
71 Some(blob) => {
72 let decode_res = if IS_INPUT {
73 resolved.abi_decode_input(blob)
74 } else {
75 resolved.abi_decode_output(blob)
76 };
77 match decode_res {
78 Ok(data) => decoded.push(Some(DynSolValue::Tuple(data))),
79 Err(e) if allow_decode_fail => {
80 log::debug!("failed to decode function call data: {}", e);
81 decoded.push(None);
82 }
83 Err(e) => {
84 return Err(anyhow!("failed to decode function call data: {}", e));
85 }
86 }
87 }
88 None => decoded.push(None),
89 }
90 }
91
92 let sol_type = if IS_INPUT {
93 DynSolType::Tuple(resolved.types().to_vec())
94 } else {
95 DynSolType::Tuple(resolved.returns().types().to_vec())
96 };
97
98 let array = to_arrow(&sol_type, decoded, allow_decode_fail).context("map params to arrow")?;
99 match array.data_type() {
100 DataType::Struct(_) => {
101 let arr = array.as_any().downcast_ref::<StructArray>().unwrap();
102
103 for f in arr.columns().iter() {
104 arrays.push(f.clone());
105 }
106 }
107 _ => unreachable!(),
108 }
109
110 RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")
111}
112
113pub fn function_signature_to_arrow_schemas(signature: &str) -> Result<(Schema, Schema)> {
115 let (func, resolved) = resolve_function_signature(signature)?;
116 function_signature_to_arrow_schemas_impl(&func, &resolved)
117}
118
119fn function_signature_to_arrow_schemas_impl(
120 func: &alloy_json_abi::Function,
121 call: &DynSolCall,
122) -> Result<(Schema, Schema)> {
123 let mut input_fields = Vec::with_capacity(call.types().len());
124 let mut output_fields = Vec::with_capacity(call.returns().types().len());
125
126 for (i, (sol_t, param)) in call.types().iter().zip(func.inputs.iter()).enumerate() {
127 let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
128 let name = if param.name() == "" {
129 format!("param{}", i)
130 } else {
131 param.name().to_owned()
132 };
133 input_fields.push(Arc::new(Field::new(name, dtype, true)));
134 }
135
136 for (i, (sol_t, param)) in call
137 .returns()
138 .types()
139 .iter()
140 .zip(func.outputs.iter())
141 .enumerate()
142 {
143 let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
144 let name = if param.name() == "" {
145 format!("param{}", i)
146 } else {
147 param.name().to_owned()
148 };
149 output_fields.push(Arc::new(Field::new(name, dtype, true)));
150 }
151
152 Ok((Schema::new(input_fields), Schema::new(output_fields)))
153}
154
155fn resolve_function_signature(signature: &str) -> Result<(alloy_json_abi::Function, DynSolCall)> {
156 let event = alloy_json_abi::Function::parse(signature).context("parse function signature")?;
157 let resolved = event.resolve().context("resolve function signature")?;
158
159 Ok((event, resolved))
160}
161
162pub fn decode_events(
169 signature: &str,
170 data: &RecordBatch,
171 allow_decode_fail: bool,
172) -> Result<RecordBatch> {
173 let (event, resolved) = resolve_event_signature(signature)?;
174
175 let schema = event_signature_to_arrow_schema_impl(&event, &resolved)
176 .context("convert event signature to arrow schema")?;
177
178 let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(schema.fields().len());
179
180 for (sol_type, topic_name) in resolved
181 .indexed()
182 .iter()
183 .zip(["topic1", "topic2", "topic3"].iter())
184 {
185 let col = data
186 .column_by_name(topic_name)
187 .context("get topic column")?;
188
189 if col.data_type() == &DataType::Binary {
190 decode_topic(
191 sol_type,
192 col.as_any().downcast_ref::<BinaryArray>().unwrap(),
193 allow_decode_fail,
194 &mut arrays,
195 )
196 .context("decode topic")?;
197 } else if col.data_type() == &DataType::LargeBinary {
198 decode_topic(
199 sol_type,
200 col.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
201 allow_decode_fail,
202 &mut arrays,
203 )
204 .context("decode topic")?;
205 }
206 }
207
208 let body_col = data.column_by_name("data").context("get data column")?;
209
210 let body_sol_type = DynSolType::Tuple(resolved.body().to_vec());
211
212 if body_col.data_type() == &DataType::Binary {
213 decode_body(
214 &body_sol_type,
215 body_col.as_any().downcast_ref::<BinaryArray>().unwrap(),
216 allow_decode_fail,
217 &mut arrays,
218 )
219 .context("decode body")?;
220 } else if body_col.data_type() == &DataType::LargeBinary {
221 decode_body(
222 &body_sol_type,
223 body_col
224 .as_any()
225 .downcast_ref::<LargeBinaryArray>()
226 .unwrap(),
227 allow_decode_fail,
228 &mut arrays,
229 )
230 .context("decode body")?;
231 }
232
233 RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")
234}
235
236fn decode_body<I: OffsetSizeTrait>(
237 body_sol_type: &DynSolType,
238 body_col: &GenericBinaryArray<I>,
239 allow_decode_fail: bool,
240 arrays: &mut Vec<Arc<dyn Array>>,
241) -> Result<()> {
242 let mut body_decoded = Vec::<Option<DynSolValue>>::with_capacity(body_col.len());
243
244 for blob in body_col.iter() {
245 match blob {
246 Some(blob) => match body_sol_type.abi_decode_sequence(blob) {
247 Ok(data) => body_decoded.push(Some(data)),
248 Err(e) if allow_decode_fail => {
249 log::debug!("failed to decode body: {}", e);
250 body_decoded.push(None);
251 }
252 Err(e) => {
253 return Err(anyhow!("failed to decode body: {}", e));
254 }
255 },
256 None => body_decoded.push(None),
257 }
258 }
259
260 let body_array =
261 to_arrow(body_sol_type, body_decoded, allow_decode_fail).context("map body to arrow")?;
262 match body_array.data_type() {
263 DataType::Struct(_) => {
264 let arr = body_array.as_any().downcast_ref::<StructArray>().unwrap();
265
266 for f in arr.columns().iter() {
267 arrays.push(f.clone());
268 }
269 }
270 _ => unreachable!(),
271 }
272
273 Ok(())
274}
275
276fn decode_topic<I: OffsetSizeTrait>(
277 sol_type: &DynSolType,
278 col: &GenericBinaryArray<I>,
279 allow_decode_fail: bool,
280 arrays: &mut Vec<Arc<dyn Array>>,
281) -> Result<()> {
282 let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(col.len());
283
284 for blob in col.iter() {
285 match blob {
286 Some(blob) => match sol_type.abi_decode(blob) {
287 Ok(data) => decoded.push(Some(data)),
288 Err(e) if allow_decode_fail => {
289 log::debug!("failed to decode a topic: {}", e);
290 decoded.push(None);
291 }
292 Err(e) => {
293 return Err(anyhow!("failed to decode a topic: {}", e));
294 }
295 },
296 None => decoded.push(None),
297 }
298 }
299
300 arrays.push(to_arrow(sol_type, decoded, allow_decode_fail).context("map topic to arrow")?);
301
302 Ok(())
303}
304
305pub fn event_signature_to_arrow_schema(signature: &str) -> Result<Schema> {
307 let (resolved, event) = resolve_event_signature(signature)?;
308 event_signature_to_arrow_schema_impl(&resolved, &event)
309}
310
311fn event_signature_to_arrow_schema_impl(
312 sig: &alloy_json_abi::Event,
313 event: &DynSolEvent,
314) -> Result<Schema> {
315 let num_fields = event.indexed().len() + event.body().len();
316 let mut fields = Vec::<Arc<Field>>::with_capacity(num_fields);
317 let mut names = Vec::with_capacity(num_fields);
318
319 for (i, input) in sig.inputs.iter().enumerate() {
320 if input.indexed {
321 let name = if input.name.is_empty() {
322 format!("param{}", i)
323 } else {
324 input.name.clone()
325 };
326 names.push(name);
327 }
328 }
329 for (i, input) in sig.inputs.iter().enumerate() {
330 if !input.indexed {
331 let name = if input.name.is_empty() {
332 format!("param{}", i)
333 } else {
334 input.name.clone()
335 };
336 names.push(name);
337 }
338 }
339
340 for (sol_t, name) in event.indexed().iter().chain(event.body()).zip(names) {
341 let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
342 fields.push(Arc::new(Field::new(name, dtype, true)));
343 }
344
345 Ok(Schema::new(fields))
346}
347
348fn resolve_event_signature(signature: &str) -> Result<(alloy_json_abi::Event, DynSolEvent)> {
349 let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
350 let resolved = event.resolve().context("resolve event signature")?;
351
352 Ok((event, resolved))
353}
354
355fn to_arrow_dtype(sol_type: &DynSolType) -> Result<DataType> {
356 match sol_type {
357 DynSolType::Bool => Ok(DataType::Boolean),
358 DynSolType::Bytes => Ok(DataType::Binary),
359 DynSolType::String => Ok(DataType::Utf8),
360 DynSolType::Address => Ok(DataType::Binary),
361 DynSolType::Int(num_bits) => Ok(num_bits_to_int_type(*num_bits)),
362 DynSolType::Uint(num_bits) => Ok(num_bits_to_uint_type(*num_bits)),
363 DynSolType::Array(inner_type) => {
364 let inner_type = to_arrow_dtype(inner_type).context("map inner")?;
365 Ok(DataType::List(Arc::new(Field::new("", inner_type, true))))
366 }
367 DynSolType::Function => Err(anyhow!(
368 "decoding 'Function' typed value in function signature isn't supported."
369 )),
370 DynSolType::FixedArray(inner_type, _) => {
371 let inner_type = to_arrow_dtype(inner_type).context("map inner")?;
372 Ok(DataType::List(Arc::new(Field::new("", inner_type, true))))
373 }
374 DynSolType::Tuple(fields) => {
375 let mut arrow_fields = Vec::<Arc<Field>>::with_capacity(fields.len());
376
377 for (i, f) in fields.iter().enumerate() {
378 let inner_dt = to_arrow_dtype(f).context("map field dt")?;
379 arrow_fields.push(Arc::new(Field::new(format!("param{}", i), inner_dt, true)));
380 }
381
382 Ok(DataType::Struct(Fields::from(arrow_fields)))
383 }
384 DynSolType::FixedBytes(_) => Ok(DataType::Binary),
385 }
386}
387
388fn num_bits_to_uint_type(num_bits: usize) -> DataType {
389 if num_bits <= 8 {
390 DataType::UInt8
391 } else if num_bits <= 16 {
392 DataType::UInt16
393 } else if num_bits <= 32 {
394 DataType::UInt32
395 } else if num_bits <= 64 {
396 DataType::UInt64
397 } else if num_bits <= 128 {
398 DataType::Decimal128(38, 0)
399 } else if num_bits <= 256 {
400 DataType::Decimal256(76, 0)
401 } else {
402 unreachable!()
403 }
404}
405
406fn num_bits_to_int_type(num_bits: usize) -> DataType {
407 if num_bits <= 8 {
408 DataType::Int8
409 } else if num_bits <= 16 {
410 DataType::Int16
411 } else if num_bits <= 32 {
412 DataType::Int32
413 } else if num_bits <= 64 {
414 DataType::Int64
415 } else if num_bits <= 128 {
416 DataType::Decimal128(38, 0)
417 } else if num_bits <= 256 {
418 DataType::Decimal256(76, 0)
419 } else {
420 unreachable!()
421 }
422}
423
424fn to_arrow(
425 sol_type: &DynSolType,
426 sol_values: Vec<Option<DynSolValue>>,
427 allow_decode_fail: bool,
428) -> Result<Arc<dyn Array>> {
429 match sol_type {
430 DynSolType::Bool => to_bool(&sol_values),
431 DynSolType::Bytes => to_binary(&sol_values),
432 DynSolType::String => to_string(&sol_values),
433 DynSolType::Address => to_binary(&sol_values),
434 DynSolType::Int(num_bits) => to_int(*num_bits, &sol_values, allow_decode_fail),
435 DynSolType::Uint(num_bits) => to_uint(*num_bits, &sol_values, allow_decode_fail),
436 DynSolType::Array(inner_type) => to_list(inner_type, sol_values, allow_decode_fail),
437 DynSolType::Function => Err(anyhow!(
438 "decoding 'Function' typed value in function signature isn't supported."
439 )),
440 DynSolType::FixedArray(inner_type, _) => to_list(inner_type, sol_values, allow_decode_fail),
441 DynSolType::Tuple(fields) => to_struct(fields, sol_values, allow_decode_fail),
442 DynSolType::FixedBytes(_) => to_binary(&sol_values),
443 }
444}
445
446fn to_int(
447 num_bits: usize,
448 sol_values: &[Option<DynSolValue>],
449 allow_decode_fail: bool,
450) -> Result<Arc<dyn Array>> {
451 match num_bits_to_int_type(num_bits) {
452 DataType::Int8 => to_int_impl::<Int8Type>(num_bits, sol_values),
453 DataType::Int16 => to_int_impl::<Int16Type>(num_bits, sol_values),
454 DataType::Int32 => to_int_impl::<Int32Type>(num_bits, sol_values),
455 DataType::Int64 => to_int_impl::<Int64Type>(num_bits, sol_values),
456 DataType::Decimal128(_, _) => to_decimal128(num_bits, sol_values),
457 DataType::Decimal256(_, _) => to_decimal256(num_bits, sol_values, allow_decode_fail),
458 _ => unreachable!(),
459 }
460}
461
462fn to_uint(
463 num_bits: usize,
464 sol_values: &[Option<DynSolValue>],
465 allow_decode_fail: bool,
466) -> Result<Arc<dyn Array>> {
467 match num_bits_to_int_type(num_bits) {
468 DataType::UInt8 => to_int_impl::<UInt8Type>(num_bits, sol_values),
469 DataType::UInt16 => to_int_impl::<UInt16Type>(num_bits, sol_values),
470 DataType::UInt32 => to_int_impl::<UInt32Type>(num_bits, sol_values),
471 DataType::UInt64 => to_int_impl::<UInt64Type>(num_bits, sol_values),
472 DataType::Decimal128(_, _) => to_decimal128(num_bits, sol_values),
473 DataType::Decimal256(_, _) => to_decimal256(num_bits, sol_values, allow_decode_fail),
474 _ => unreachable!(),
475 }
476}
477
478fn to_decimal128(num_bits: usize, sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
479 let mut builder = builder::Decimal128Builder::new();
480
481 for val in sol_values.iter() {
482 match val {
483 Some(val) => match val {
484 DynSolValue::Int(v, nb) => {
485 assert_eq!(num_bits, *nb);
486
487 let v = i128::try_from(*v).context("convert to i128")?;
488
489 builder.append_value(v);
490 }
491 DynSolValue::Uint(v, nb) => {
492 assert_eq!(num_bits, *nb);
493
494 let v = i128::try_from(*v).context("convert to i128")?;
495
496 builder.append_value(v);
497 }
498 _ => {
499 return Err(anyhow!(
500 "found unexpected value. Expected: bool, Found: {:?}",
501 val
502 ));
503 }
504 },
505 None => {
506 builder.append_null();
507 }
508 }
509 }
510
511 builder = builder.with_data_type(DataType::Decimal128(38, 0));
512
513 Ok(Arc::new(builder.finish()))
514}
515
516fn to_decimal256(
517 num_bits: usize,
518 sol_values: &[Option<DynSolValue>],
519 allow_decode_fail: bool,
520) -> Result<Arc<dyn Array>> {
521 let mut builder = builder::Decimal256Builder::new();
522
523 for val in sol_values.iter() {
524 match val {
525 Some(val) => match val {
526 DynSolValue::Int(v, nb) => {
527 assert_eq!(num_bits, *nb);
528
529 let v = arrow::datatypes::i256::from_be_bytes(v.to_be_bytes::<32>());
530
531 builder.append_value(v);
532 }
533 DynSolValue::Uint(v, nb) => {
534 assert_eq!(num_bits, *nb);
535 match I256::try_from(*v).context("try u256 to i256") {
536 Ok(v) => builder.append_value(arrow::datatypes::i256::from_be_bytes(
537 v.to_be_bytes::<32>(),
538 )),
539 Err(e) => {
540 if allow_decode_fail {
541 log::debug!("failed to decode u256: {}", e);
542 builder.append_null();
543 } else {
544 return Err(e);
545 }
546 }
547 }
548 }
549 _ => {
550 return Err(anyhow!(
551 "found unexpected value. Expected: bool, Found: {:?}",
552 val
553 ));
554 }
555 },
556 None => {
557 builder.append_null();
558 }
559 }
560 }
561
562 builder = builder.with_data_type(DataType::Decimal256(76, 0));
563
564 Ok(Arc::new(builder.finish()))
565}
566
567fn to_int_impl<T>(num_bits: usize, sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>>
568where
569 T: ArrowPrimitiveType,
570 T::Native: TryFrom<I256> + TryFrom<U256>,
571{
572 let mut builder = builder::PrimitiveBuilder::<T>::new();
573
574 for val in sol_values.iter() {
575 match val {
576 Some(val) => match val {
577 DynSolValue::Int(v, nb) => {
578 assert_eq!(num_bits, *nb);
579 builder.append_value(match T::Native::try_from(*v) {
580 Ok(v) => v,
581 Err(_) => unreachable!(),
582 });
583 }
584 DynSolValue::Uint(v, nb) => {
585 assert_eq!(num_bits, *nb);
586 builder.append_value(match T::Native::try_from(*v) {
587 Ok(v) => v,
588 Err(_) => unreachable!(),
589 });
590 }
591 _ => {
592 return Err(anyhow!(
593 "found unexpected value. Expected: bool, Found: {:?}",
594 val
595 ));
596 }
597 },
598 None => {
599 builder.append_null();
600 }
601 }
602 }
603
604 Ok(Arc::new(builder.finish()))
605}
606
607fn to_list(
608 sol_type: &DynSolType,
609 sol_values: Vec<Option<DynSolValue>>,
610 allow_decode_fail: bool,
611) -> Result<Arc<dyn Array>> {
612 let mut lengths = Vec::with_capacity(sol_values.len());
613 let mut values = Vec::with_capacity(sol_values.len() * 2);
614 let mut validity = Vec::with_capacity(sol_values.len() * 2);
615
616 let mut all_valid = true;
617
618 for val in sol_values {
619 match val {
620 Some(val) => match val {
621 DynSolValue::Array(inner_vals) | DynSolValue::FixedArray(inner_vals) => {
622 lengths.push(inner_vals.len());
623 values.extend(inner_vals.into_iter().map(Some));
624 validity.push(true);
625 }
626 _ => {
627 return Err(anyhow!(
628 "found unexpected value. Expected list type, Found: {:?}",
629 val
630 ));
631 }
632 },
633 None => {
634 lengths.push(0);
635 validity.push(false);
636 all_valid = false;
637 }
638 }
639 }
640
641 let values = to_arrow(sol_type, values, allow_decode_fail).context("map inner")?;
642 let field = Field::new(
643 "",
644 to_arrow_dtype(sol_type).context("construct data type")?,
645 true,
646 );
647 let list_arr = ListArray::try_new(
648 Arc::new(field),
649 OffsetBuffer::from_lengths(lengths),
650 values,
651 if all_valid {
652 None
653 } else {
654 Some(NullBuffer::from(validity))
655 },
656 )
657 .context("construct list array")?;
658 Ok(Arc::new(list_arr))
659}
660
661fn to_struct(
662 fields: &[DynSolType],
663 sol_values: Vec<Option<DynSolValue>>,
664 allow_decode_fail: bool,
665) -> Result<Arc<dyn Array>> {
666 let mut values = vec![Vec::with_capacity(sol_values.len()); fields.len()];
667
668 for val in sol_values.iter() {
672 match val {
673 Some(val) => match val {
674 DynSolValue::Tuple(inner_vals) => {
675 if values.len() != inner_vals.len() {
676 return Err(anyhow!(
677 "found unexpected length tuple value. Expected: {}, Found: {}",
678 values.len(),
679 inner_vals.len()
680 ));
681 }
682 for (v, inner) in values.iter_mut().zip(inner_vals) {
683 v.push(Some(inner.clone()));
684 }
685 }
686 _ => {
687 return Err(anyhow!(
688 "found unexpected value. Expected: tuple, Found: {:?}",
689 val
690 ));
691 }
692 },
693 None => {
694 for v in values.iter_mut() {
695 v.push(None);
696 }
697 }
698 }
699 }
700
701 let mut arrays = Vec::with_capacity(fields.len());
702
703 for (sol_type, arr_vals) in fields.iter().zip(values.into_iter()) {
704 arrays.push(to_arrow(sol_type, arr_vals, allow_decode_fail)?);
705 }
706
707 let fields = arrays
708 .iter()
709 .enumerate()
710 .map(|(i, arr)| Field::new(format!("param{}", i), arr.data_type().clone(), true))
711 .collect::<Vec<_>>();
712 let schema = Arc::new(Schema::new(fields));
713
714 let batch = RecordBatch::try_new(schema, arrays).context("construct record batch")?;
715
716 Ok(Arc::new(StructArray::from(batch)))
717}
718
719fn to_bool(sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
720 let mut builder = builder::BooleanBuilder::new();
721
722 for val in sol_values.iter() {
723 match val {
724 Some(val) => match val {
725 DynSolValue::Bool(b) => {
726 builder.append_value(*b);
727 }
728 _ => {
729 return Err(anyhow!(
730 "found unexpected value. Expected: bool, Found: {:?}",
731 val
732 ));
733 }
734 },
735 None => {
736 builder.append_null();
737 }
738 }
739 }
740
741 Ok(Arc::new(builder.finish()))
742}
743
744fn to_binary(sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
745 let mut builder = builder::BinaryBuilder::new();
746
747 for val in sol_values.iter() {
748 match val {
749 Some(val) => match val {
750 DynSolValue::Bytes(data) => {
751 builder.append_value(data);
752 }
753 DynSolValue::FixedBytes(data, _) => {
754 builder.append_value(data);
755 }
756 DynSolValue::Address(data) => {
757 builder.append_value(data);
758 }
759 DynSolValue::Uint(v, _) => {
760 builder.append_value(v.to_be_bytes::<32>());
761 }
762 DynSolValue::Int(v, _) => {
763 builder.append_value(v.to_be_bytes::<32>());
764 }
765 _ => {
766 return Err(anyhow!(
767 "found unexpected value. Expected a binary type, Found: {:?}",
768 val
769 ));
770 }
771 },
772 None => {
773 builder.append_null();
774 }
775 }
776 }
777
778 Ok(Arc::new(builder.finish()))
779}
780
781fn to_string(sol_values: &[Option<DynSolValue>]) -> Result<Arc<dyn Array>> {
782 let mut builder = builder::StringBuilder::new();
783
784 for val in sol_values.iter() {
785 match val {
786 Some(val) => match val {
787 DynSolValue::String(s) => {
788 builder.append_value(s);
789 }
790 _ => {
791 return Err(anyhow!(
792 "found unexpected value. Expected string, Found: {:?}",
793 val
794 ));
795 }
796 },
797 None => {
798 builder.append_null();
799 }
800 }
801 }
802
803 Ok(Arc::new(builder.finish()))
804}
805
806#[cfg(test)]
807mod tests {
808 use super::*;
809
810 #[test]
811 #[ignore]
812 fn nested_event_signature_to_schema() {
813 let sig = "ConfiguredQuests(address editor, uint256[][], address indexed my_addr, (bool,bool[],(bool, uint256[]))[] questDetails)";
814
815 let schema = event_signature_to_arrow_schema(sig).unwrap();
816
817 let expected_schema = Schema::new(vec![
818 Arc::new(Field::new("my_addr", DataType::Binary, true)),
819 Arc::new(Field::new("editor", DataType::Binary, true)),
820 Arc::new(Field::new(
821 "param1",
822 DataType::List(Arc::new(Field::new(
823 "",
824 DataType::List(Arc::new(Field::new("", DataType::Decimal256(76, 0), true))),
825 true,
826 ))),
827 true,
828 )),
829 Arc::new(Field::new(
830 "questDetails",
831 DataType::List(Arc::new(Field::new(
832 "",
833 DataType::Struct(Fields::from(vec![
834 Arc::new(Field::new("param0", DataType::Boolean, true)),
835 Arc::new(Field::new(
836 "param1",
837 DataType::List(Arc::new(Field::new("", DataType::Boolean, true))),
838 true,
839 )),
840 Arc::new(Field::new(
841 "param2",
842 DataType::Struct(Fields::from(vec![
843 Arc::new(Field::new("param0", DataType::Boolean, true)),
844 Arc::new(Field::new(
845 "param1",
846 DataType::List(Arc::new(Field::new(
847 "",
848 DataType::Decimal256(76, 0),
849 true,
850 ))),
851 true,
852 )),
853 ])),
854 true,
855 )),
856 ])),
857 true,
858 ))),
859 true,
860 )),
861 ]);
862
863 assert_eq!(schema, expected_schema);
864 }
865
866 #[test]
867 #[ignore]
868 fn i256_to_arrow_i256() {
869 for val in [
870 I256::MIN,
871 I256::MAX,
872 I256::MAX / I256::try_from(2i32).unwrap(),
873 ] {
874 let out = arrow::datatypes::i256::from_be_bytes(val.to_be_bytes::<32>());
875
876 assert_eq!(val.to_string(), out.to_string());
877 }
878 }
879
880 #[test]
881 #[ignore]
882 fn read_parquet_with_real_data() {
883 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
884 use std::fs::File;
885 let builder =
886 ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
887 let mut reader = builder.build().unwrap();
888 let logs = reader.next().unwrap().unwrap();
889
890 let signature =
891 "PairCreated(address indexed token0, address indexed token1, address pair,uint256)";
892
893 let decoded = decode_events(signature, &logs, false).unwrap();
894
895 let mut file = File::create("decoded_logs.parquet").unwrap();
897 let mut writer =
898 parquet::arrow::ArrowWriter::try_new(&mut file, decoded.schema(), None).unwrap();
899 writer.write(&decoded).unwrap();
900 writer.close().unwrap();
901 }
902}