1use std::{
2 collections::{BTreeMap, VecDeque},
3 path::PathBuf,
4 sync::Arc,
5 time::Instant,
6};
7
8use alloy_dyn_abi::{DynSolType, DynSolValue, Specifier};
9use alloy_json_abi::EventParam;
10use anyhow::{anyhow, Context, Result};
11use polars_arrow::{
12 array::{Array, BinaryViewArray, MutableArray, MutableBooleanArray, Utf8ViewArray},
13 datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field},
14};
15use polars_arrow::{
16 array::{ArrayFromIter, BinaryArray, MutableBinaryViewArray, Utf8Array},
17 legacy::error::PolarsError,
18};
19use polars_parquet::parquet::write::FileStreamer;
20use polars_parquet::{
21 read::ParquetError,
22 write::{
23 array_to_columns, to_parquet_schema, to_parquet_type, transverse, CompressedPage, DynIter,
24 DynStreamingIterator, Encoding, FallibleStreamingIterator, RowGroupIter, WriteOptions,
25 },
26};
27use rayon::prelude::*;
28use skar_net_types::Query;
29use skar_schema::{concat_chunks, empty_chunk};
30use tokio::{sync::mpsc, task::JoinHandle};
31use tokio_util::compat::TokioAsyncReadCompatExt;
32
33use crate::{
34 column_mapping, rayon_async, types::StreamConfig, ArrowBatch, ArrowChunk, Client, ParquetConfig,
35};
36
37pub async fn create_parquet_folder(
38 client: &Client,
39 query: Query,
40 config: ParquetConfig,
41) -> Result<()> {
42 let path = PathBuf::from(config.path);
43
44 tokio::fs::create_dir_all(&path)
45 .await
46 .context("create parquet dir")?;
47
48 let mut blocks_path = path.clone();
49 blocks_path.push("blocks.parquet");
50 let (mut blocks_sender, blocks_join) =
51 spawn_writer(blocks_path, &config.column_mapping.block, config.hex_output)?;
52
53 let mut transactions_path = path.clone();
54 transactions_path.push("transactions.parquet");
55 let (mut transactions_sender, transactions_join) = spawn_writer(
56 transactions_path,
57 &config.column_mapping.transaction,
58 config.hex_output,
59 )?;
60
61 let mut logs_path = path.clone();
62 logs_path.push("logs.parquet");
63 let (mut logs_sender, logs_join) =
64 spawn_writer(logs_path, &config.column_mapping.log, config.hex_output)?;
65
66 let mut traces_path = path.clone();
67 traces_path.push("traces.parquet");
68 let (mut traces_sender, traces_join) =
69 spawn_writer(traces_path, &config.column_mapping.trace, config.hex_output)?;
70
71 let event_signature = match &config.event_signature {
72 Some(sig) => Some(alloy_json_abi::Event::parse(sig).context("parse event signature")?),
73 None => None,
74 };
75
76 let mut decoded_logs_path = path.clone();
77 decoded_logs_path.push("decoded_logs.parquet");
78 let (mut decoded_logs_sender, decoded_logs_join) = spawn_writer(
79 decoded_logs_path,
80 &config.column_mapping.decoded_log,
81 config.hex_output,
82 )?;
83
84 let mut rx = client
85 .stream::<crate::ArrowIpc>(
86 query,
87 StreamConfig {
88 concurrency: config.concurrency,
89 batch_size: config.batch_size,
90 retry: config.retry,
91 },
92 )
93 .await
94 .context("start stream")?;
95
96 while let Some(resp) = rx.recv().await {
97 let resp = resp.context("get query response")?;
98
99 log::trace!("got data up to block {}", resp.next_block);
100
101 let blocks_fut = async move {
102 for batch in resp.data.blocks {
103 let batch = map_batch_to_binary_view(batch);
104 blocks_sender
105 .send(batch)
106 .await
107 .context("write blocks chunk to parquet")?;
108 }
109
110 Ok::<_, anyhow::Error>(blocks_sender)
111 };
112
113 let txs_fut = async move {
114 for batch in resp.data.transactions {
115 let batch = map_batch_to_binary_view(batch);
116 transactions_sender
117 .send(batch)
118 .await
119 .context("write transactions chunk to parquet")?;
120 }
121
122 Ok::<_, anyhow::Error>(transactions_sender)
123 };
124
125 let logs_fut = {
126 let data = resp.data.logs.clone();
127 async move {
128 for batch in data {
129 let batch = map_batch_to_binary_view(batch);
130 logs_sender
131 .send(batch)
132 .await
133 .context("write logs chunk to parquet")?;
134 }
135
136 Ok::<_, anyhow::Error>(logs_sender)
137 }
138 };
139
140 let traces_fut = async move {
141 for batch in resp.data.traces {
142 let batch = map_batch_to_binary_view(batch);
143 traces_sender
144 .send(batch)
145 .await
146 .context("write traces chunk to parquet")?;
147 }
148
149 Ok::<_, anyhow::Error>(traces_sender)
150 };
151
152 let sig = Arc::new(event_signature.clone());
153 let decoded_logs_fut = async move {
154 for batch in resp.data.logs {
155 let sig = sig.clone();
156 let batch = map_batch_to_binary_view(batch);
157 let batch = rayon_async::spawn(move || decode_logs_batch(&sig, batch))
158 .await
159 .context("join decode logs task")?
160 .context("decode logs")?;
161
162 decoded_logs_sender
163 .send(batch)
164 .await
165 .context("write decoded_logs chunk to parquet")?;
166 }
167
168 Ok::<_, anyhow::Error>(decoded_logs_sender)
169 };
170
171 let start = Instant::now();
172
173 (
174 blocks_sender,
175 transactions_sender,
176 logs_sender,
177 traces_sender,
178 decoded_logs_sender,
179 ) = futures::future::try_join5(blocks_fut, txs_fut, logs_fut, traces_fut, decoded_logs_fut)
180 .await
181 .context("write to parquet")?;
182
183 log::trace!("wrote to parquet in {} ms", start.elapsed().as_millis());
184 }
185
186 std::mem::drop(blocks_sender);
187 std::mem::drop(transactions_sender);
188 std::mem::drop(logs_sender);
189 std::mem::drop(traces_sender);
190 std::mem::drop(decoded_logs_sender);
191
192 blocks_join
193 .await
194 .context("join blocks task")?
195 .context("finish blocks file")?;
196 transactions_join
197 .await
198 .context("join transactions task")?
199 .context("finish transactions file")?;
200 logs_join
201 .await
202 .context("join logs task")?
203 .context("finish logs file")?;
204 traces_join
205 .await
206 .context("join traces task")?
207 .context("finish traces file")?;
208 decoded_logs_join
209 .await
210 .context("join decoded_logs task")?
211 .context("finish decoded_logs file")?;
212
213 Ok(())
214}
215
216fn hex_encode_chunk(chunk: &ArrowChunk) -> anyhow::Result<ArrowChunk> {
217 let cols = chunk
218 .columns()
219 .par_iter()
220 .map(|col| {
221 let col = match col.data_type() {
222 DataType::BinaryView => Box::new(hex_encode(col.as_any().downcast_ref().unwrap())),
223 _ => col.clone(),
224 };
225
226 Ok::<_, anyhow::Error>(col)
227 })
228 .collect::<Result<Vec<_>>>()?;
229
230 Ok(ArrowChunk::new(cols))
231}
232
233fn hex_encode(input: &BinaryViewArray) -> Utf8ViewArray {
234 let mut arr = MutableBinaryViewArray::<str>::new();
235
236 for buf in input.iter() {
237 arr.push(buf.map(faster_hex::hex_string));
238 }
239
240 arr.into()
241}
242
243fn spawn_writer(
244 path: PathBuf,
245 mapping: &BTreeMap<String, column_mapping::DataType>,
246 hex_output: bool,
247) -> Result<(mpsc::Sender<ArrowBatch>, JoinHandle<Result<()>>)> {
248 let (tx, rx) = mpsc::channel(64);
249
250 let mapping = Arc::new(mapping.clone());
251
252 let handle = tokio::task::spawn(async move {
253 match run_writer(rx, path, mapping, hex_output).await {
254 Ok(v) => Ok(v),
255 Err(e) => {
256 log::error!("failed to run parquet writer: {:?}", e);
257 Err(e)
258 }
259 }
260 });
261
262 Ok((tx, handle))
263}
264
265async fn run_writer(
266 mut rx: mpsc::Receiver<ArrowBatch>,
267 path: PathBuf,
268 mapping: Arc<BTreeMap<String, crate::DataType>>,
269 hex_output: bool,
270) -> Result<()> {
271 let make_writer = move |schema: &Schema| {
272 let schema = schema.clone();
273 let path = path.clone();
274 async move {
275 let write_options = polars_parquet::parquet::write::WriteOptions {
276 write_statistics: true,
277 version: polars_parquet::parquet::write::Version::V2,
278 };
279
280 let file = tokio::fs::File::create(&path)
281 .await
282 .context("create parquet file")?
283 .compat();
284
285 let parquet_schema = to_parquet_schema(&schema).context("to parquet schema")?;
286
287 let writer = FileStreamer::new(file, parquet_schema, write_options, None);
288
289 Ok::<_, anyhow::Error>(writer)
290 }
291 };
292
293 let mut writer = None;
294
295 let num_cpus = num_cpus::get();
296 let mut encode_jobs = VecDeque::<EncodeFut>::with_capacity(num_cpus);
297
298 let mut data = Vec::new();
299 let mut total_rows = 0;
300 loop {
301 let mut stop = false;
302 if let Some(batch) = rx.recv().await {
303 total_rows += batch.chunk.len();
304 data.push(batch);
305 } else {
306 stop = true;
307 }
308
309 if !data.is_empty() && (stop || total_rows >= ROW_GROUP_MAX_ROWS) {
310 let batches = std::mem::take(&mut data);
311 if encode_jobs.len() >= num_cpus {
312 let fut = encode_jobs.pop_front().unwrap();
313 let (rg, schema) = fut
314 .await
315 .context("join prepare task")?
316 .context("prepare row group")?;
317 if writer.is_none() {
318 writer = Some(make_writer(&schema).await.context("create writer")?);
319 }
320 writer
321 .as_mut()
322 .unwrap()
323 .write(rg)
324 .await
325 .context("write encoded row group to file")?;
326 }
327
328 total_rows = 0;
329 let schema = batches[0].schema.clone();
330 let chunks = batches.into_iter().map(|b| b.chunk).collect::<Vec<_>>();
331
332 let chunk = concat_chunks(chunks.as_slice()).context("concat chunks")?;
333 let mapping = mapping.clone();
334 let fut = rayon_async::spawn(move || {
335 let field_names = schema
336 .fields
337 .iter()
338 .map(|f| f.name.as_str())
339 .collect::<Vec<&str>>();
340 let chunk = column_mapping::apply_to_chunk(&chunk, &field_names, &mapping)
341 .context("apply column mapping to batch")?;
342
343 let chunk = if hex_output {
344 hex_encode_chunk(&chunk).context("hex encode batch")?
345 } else {
346 chunk
347 };
348 let chunk = Arc::new(chunk);
349
350 let schema = chunk
351 .iter()
352 .zip(schema.fields.iter())
353 .map(|(col, field)| {
354 Field::new(
355 field.name.clone(),
356 col.data_type().clone(),
357 field.is_nullable,
358 )
359 })
360 .collect::<Vec<_>>();
361 let schema = Arc::new(Schema::from(schema));
362
363 let rg = encode_row_group(
364 ArrowBatch {
365 chunk,
366 schema: schema.clone(),
367 },
368 WriteOptions {
369 write_statistics: true,
370 version: polars_parquet::write::Version::V2,
371 compression: polars_parquet::write::CompressionOptions::Lz4Raw,
372 data_pagesize_limit: None,
373 },
374 )
375 .context("encode row group")?;
376
377 Ok((rg, schema))
378 });
379
380 encode_jobs.push_back(fut);
381 }
382
383 if stop {
384 break;
385 }
386 }
387
388 while let Some(fut) = encode_jobs.pop_front() {
389 let (rg, schema) = fut
390 .await
391 .context("join prepare task")?
392 .context("prepare row group")?;
393 if writer.is_none() {
394 writer = Some(make_writer(&schema).await.context("create writer")?);
395 }
396 writer
397 .as_mut()
398 .unwrap()
399 .write(rg)
400 .await
401 .context("write encoded row group to file")?;
402 }
403
404 if let Some(writer) = writer.as_mut() {
405 let _size = writer.end(None).await.context("write footer")?;
406 }
407
408 Ok(())
409}
410
411type EncodeFut = tokio::sync::oneshot::Receiver<
412 Result<(
413 DynIter<
414 'static,
415 std::result::Result<
416 DynStreamingIterator<'static, CompressedPage, PolarsError>,
417 PolarsError,
418 >,
419 >,
420 Arc<Schema>,
421 )>,
422>;
423
424fn encode_row_group(
425 batch: ArrowBatch,
426 write_options: WriteOptions,
427) -> Result<RowGroupIter<'static, PolarsError>> {
428 let fields = batch
429 .schema
430 .fields
431 .iter()
432 .map(|field| to_parquet_type(field).context("map to parquet field"))
433 .collect::<Result<Vec<_>>>()?;
434 let encodings = batch
435 .schema
436 .fields
437 .iter()
438 .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
439 .collect::<Vec<_>>();
440
441 let data = batch
442 .chunk
443 .arrays()
444 .iter()
445 .zip(fields)
446 .zip(encodings)
447 .flat_map(move |((array, type_), encoding)| {
448 let encoded_columns = array_to_columns(array, type_, write_options, &encoding).unwrap();
449 encoded_columns
450 .into_iter()
451 .map(|encoded_pages| {
452 let pages = encoded_pages;
453
454 let pages = DynIter::new(
455 pages
456 .into_iter()
457 .map(|x| x.map_err(|e| ParquetError::OutOfSpec(e.to_string()))),
458 );
459
460 let compressed_pages = pages
461 .map(|page| {
462 let page = page?;
463 polars_parquet::write::compress(page, vec![], write_options.compression)
464 .map_err(PolarsError::from)
465 })
466 .collect::<Vec<_>>();
467
468 Ok(DynStreamingIterator::new(CompressedPageIter {
469 data: compressed_pages.into_iter(),
470 current: None,
471 }))
472 })
473 .collect::<Vec<_>>()
474 })
475 .collect::<Vec<_>>();
476 Ok(DynIter::new(data.into_iter()))
477}
478
479struct CompressedPageIter {
480 data: std::vec::IntoIter<std::result::Result<CompressedPage, PolarsError>>,
481 current: Option<CompressedPage>,
482}
483
484impl FallibleStreamingIterator for CompressedPageIter {
485 type Item = CompressedPage;
486 type Error = PolarsError;
487
488 fn get(&self) -> Option<&Self::Item> {
489 self.current.as_ref()
490 }
491
492 fn advance(&mut self) -> std::result::Result<(), Self::Error> {
493 self.current = match self.data.next() {
494 Some(page) => Some(page?),
495 None => None,
496 };
497 Ok(())
498 }
499}
500
501const ROW_GROUP_MAX_ROWS: usize = 10_000;
502
503fn decode_logs_batch(sig: &Option<alloy_json_abi::Event>, batch: ArrowBatch) -> Result<ArrowBatch> {
504 let schema =
505 schema_from_event_signature(sig).context("build arrow schema from event signature")?;
506
507 if batch.chunk.is_empty() {
508 return Ok(ArrowBatch {
509 chunk: Arc::new(empty_chunk(&schema)),
510 schema: Arc::new(schema),
511 });
512 }
513
514 let sig = match sig {
515 Some(sig) => sig,
516 None => {
517 return Ok(ArrowBatch {
518 chunk: Arc::new(empty_chunk(&schema)),
519 schema: Arc::new(schema),
520 })
521 }
522 };
523
524 let event = sig.resolve().context("resolve signature into event")?;
525
526 let topic_cols = event
527 .indexed()
528 .iter()
529 .zip(["topic1", "topic2", "topic3"].iter())
530 .map(|(decoder, topic_name)| {
531 let col = batch
532 .column::<BinaryViewArray>(topic_name)
533 .context("get column")?;
534 let col = decode_col(col, decoder).context("decode column")?;
535 Ok::<_, anyhow::Error>(col)
536 })
537 .collect::<Result<Vec<_>>>()?;
538
539 let body_cols = if event.body() == [DynSolType::Uint(256)] {
540 let data = batch
541 .column::<BinaryViewArray>("data")
542 .context("get column")?;
543 vec![decode_erc20_amount(data, &DynSolType::Uint(256)).context("decode amount column")?]
544 } else if !event.body().is_empty() {
545 let data = batch
546 .column::<BinaryViewArray>("data")
547 .context("get column")?;
548
549 let tuple_decoder = DynSolType::Tuple(event.body().to_vec());
550
551 let mut decoded_tuples = Vec::with_capacity(data.len());
552 for val in data.values_iter() {
553 let tuple = tuple_decoder
554 .abi_decode(val)
555 .context("decode body tuple")
556 .and_then(|v| {
557 let tuple = v
558 .as_tuple()
559 .context("expected tuple after decoding")?
560 .to_vec();
561
562 if tuple.len() != event.body().len() {
563 return Err(anyhow!(
564 "expected tuple of length {} after decoding",
565 event.body().len()
566 ));
567 }
568
569 Ok(Some(tuple))
570 });
571
572 let tuple = match tuple {
573 Err(e) => {
574 log::error!(
575 "failed to decode body of a log, will write null instead. Error was: {:?}",
576 e
577 );
578 None
579 }
580 Ok(v) => v,
581 };
582
583 decoded_tuples.push(tuple);
584 }
585
586 let mut decoded_cols = Vec::with_capacity(event.body().len());
587
588 for (i, ty) in event.body().iter().enumerate() {
589 decoded_cols.push(
590 decode_body_col(
591 decoded_tuples
592 .iter()
593 .map(|t| t.as_ref().map(|t| t.get(i).unwrap())),
594 ty,
595 )
596 .context("decode body column")?,
597 );
598 }
599
600 decoded_cols
601 } else {
602 Vec::new()
603 };
604
605 let mut cols = topic_cols;
606 cols.extend_from_slice(&body_cols);
607
608 let chunk = Arc::new(ArrowChunk::try_new(cols).context("create arrow chunk")?);
609
610 Ok(ArrowBatch {
611 chunk,
612 schema: Arc::new(schema),
613 })
614}
615
616fn decode_body_col<'a, I: ExactSizeIterator<Item = Option<&'a DynSolValue>>>(
617 vals: I,
618 ty: &DynSolType,
619) -> Result<Box<dyn Array>> {
620 match ty {
621 DynSolType::Bool => {
622 let mut builder = MutableBooleanArray::with_capacity(vals.len());
623
624 for val in vals {
625 let val = match val {
626 Some(val) => val,
627 None => {
628 builder.push_null();
629 continue;
630 }
631 };
632
633 match val {
634 DynSolValue::Bool(b) => builder.push(Some(*b)),
635 v => {
636 return Err(anyhow!(
637 "unexpected output type from decode: {:?}",
638 v.as_type()
639 ))
640 }
641 }
642 }
643
644 Ok(builder.as_box())
645 }
646 _ => {
647 let mut builder = MutableBinaryViewArray::<[u8]>::new();
648
649 for val in vals {
650 let val = match val {
651 Some(val) => val,
652 None => {
653 builder.push_null();
654 continue;
655 }
656 };
657
658 match val {
659 DynSolValue::Int(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
660 DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
661 DynSolValue::FixedBytes(v, _) => builder.push(Some(v)),
662 DynSolValue::Address(v) => builder.push(Some(v)),
663 DynSolValue::Bytes(v) => builder.push(Some(v)),
664 DynSolValue::String(v) => builder.push(Some(v)),
665 v => {
666 return Err(anyhow!(
667 "unexpected output type from decode: {:?}",
668 v.as_type()
669 ))
670 }
671 }
672 }
673
674 Ok(builder.as_box())
675 }
676 }
677}
678
679fn decode_erc20_amount(data: &BinaryViewArray, decoder: &DynSolType) -> Result<Box<dyn Array>> {
680 let mut builder = MutableBinaryViewArray::<[u8]>::new();
681
682 for val in data.values_iter() {
683 let v = if val.is_empty() {
687 [0; 32].as_slice()
688 } else {
689 val
690 };
691
692 match decoder.abi_decode(v).context("decode val")? {
693 DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
694 v => {
695 return Err(anyhow!(
696 "unexpected output type from decode: {:?}",
697 v.as_type()
698 ))
699 }
700 }
701 }
702
703 Ok(builder.as_box())
704}
705
706fn decode_col(col: &BinaryViewArray, decoder: &DynSolType) -> Result<Box<dyn Array>> {
707 match decoder {
708 DynSolType::Bool => {
709 let mut builder = MutableBooleanArray::with_capacity(col.len());
710
711 for val in col.iter() {
712 let val = match val {
713 Some(val) => val,
714 None => {
715 builder.push_null();
716 continue;
717 }
718 };
719 match decoder.abi_decode(val).context("decode sol value")? {
720 DynSolValue::Bool(b) => builder.push(Some(b)),
721 v => {
722 return Err(anyhow!(
723 "unexpected output type from decode: {:?}",
724 v.as_type()
725 ))
726 }
727 }
728 }
729
730 Ok(builder.as_box())
731 }
732 _ => {
733 let mut builder = MutableBinaryViewArray::<[u8]>::new();
734
735 for val in col.iter() {
736 let val = match val {
737 Some(val) => val,
738 None => {
739 builder.push_null();
740 continue;
741 }
742 };
743
744 match decoder.abi_decode(val).context("decode sol value")? {
745 DynSolValue::Int(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
746 DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
747 DynSolValue::FixedBytes(v, _) => builder.push(Some(v)),
748 DynSolValue::Address(v) => builder.push(Some(v)),
749 DynSolValue::Bytes(v) => builder.push(Some(v)),
750 DynSolValue::String(v) => builder.push(Some(v)),
751 v => {
752 return Err(anyhow!(
753 "unexpected output type from decode: {:?}",
754 v.as_type()
755 ))
756 }
757 }
758 }
759
760 Ok(builder.as_box())
761 }
762 }
763}
764
765fn schema_from_event_signature(sig: &Option<alloy_json_abi::Event>) -> Result<Schema> {
766 let sig = match sig {
767 Some(sig) => sig,
768 None => {
769 return Ok(Schema::from(vec![Field::new(
770 "dummy",
771 DataType::Boolean,
772 true,
773 )]));
774 }
775 };
776
777 let event = sig.resolve().context("resolve signature into event")?;
778
779 let mut fields: Vec<Field> = Vec::with_capacity(sig.inputs.len());
780
781 for (input, resolved_type) in sig
782 .inputs
783 .iter()
784 .filter(|i| i.indexed)
785 .zip(event.indexed().iter())
786 {
787 fields.push(process_input(&fields, input, resolved_type).context("process input")?);
788 }
789
790 for (input, resolved_type) in sig
791 .inputs
792 .iter()
793 .filter(|i| !i.indexed)
794 .zip(event.body().iter())
795 {
796 fields.push(process_input(&fields, input, resolved_type).context("process input")?);
797 }
798
799 Ok(Schema::from(fields))
800}
801
802fn process_input(
803 fields: &[Field],
804 input: &EventParam,
805 resolved_type: &DynSolType,
806) -> Result<Field> {
807 if input.name.is_empty() {
808 return Err(anyhow!("empty param names are not supported"));
809 }
810
811 if fields
812 .iter()
813 .any(|f| f.name.as_str() == input.name.as_str())
814 {
815 return Err(anyhow!("duplicate param name: {}", input.name));
816 }
817
818 let ty = DynSolType::parse(&input.ty).context("parse solidity type")?;
819
820 if &ty != resolved_type {
821 return Err(anyhow!(
822 "Internal error: Parsed type doesn't match resolved type. This should never happen."
823 ));
824 }
825
826 let dt = simple_type_to_data_type(&ty).context("convert simple type to arrow datatype")?;
827
828 Ok(Field::new(input.name.clone(), dt, true))
829}
830
831fn simple_type_to_data_type(ty: &DynSolType) -> Result<DataType> {
832 match ty {
833 DynSolType::Bool => Ok(DataType::Boolean),
834 DynSolType::Int(_) => Ok(DataType::BinaryView),
835 DynSolType::Uint(_) => Ok(DataType::BinaryView),
836 DynSolType::FixedBytes(_) => Ok(DataType::BinaryView),
837 DynSolType::Address => Ok(DataType::BinaryView),
838 DynSolType::Bytes => Ok(DataType::BinaryView),
839 DynSolType::String => Ok(DataType::BinaryView),
840 ty => Err(anyhow!(
841 "Complex types are not supported. Unexpected type: {}",
842 ty
843 )),
844 }
845}
846
847pub fn map_batch_to_binary_view(batch: ArrowBatch) -> ArrowBatch {
848 let cols = batch
849 .chunk
850 .arrays()
851 .iter()
852 .map(|col| match col.data_type() {
853 DataType::Binary => BinaryViewArray::arr_from_iter(
854 col.as_any()
855 .downcast_ref::<BinaryArray<i32>>()
856 .unwrap()
857 .iter(),
858 )
859 .boxed(),
860 DataType::Utf8 => Utf8ViewArray::arr_from_iter(
861 col.as_any()
862 .downcast_ref::<Utf8Array<i32>>()
863 .unwrap()
864 .iter(),
865 )
866 .boxed(),
867 _ => col.clone(),
868 })
869 .collect::<Vec<_>>();
870
871 let fields = cols
872 .iter()
873 .zip(batch.schema.fields.iter())
874 .map(|(col, field)| {
875 Field::new(
876 field.name.clone(),
877 col.data_type().clone(),
878 field.is_nullable,
879 )
880 })
881 .collect::<Vec<_>>();
882
883 let schema = Schema {
884 fields,
885 metadata: Default::default(),
886 };
887
888 ArrowBatch {
889 chunk: Arc::new(ArrowChunk::new(cols)),
890 schema: Arc::new(schema),
891 }
892}
893
894#[cfg(test)]
895mod tests {
896 use alloy_json_abi::Event;
897
898 use super::*;
899
900 #[test]
901 fn test_trailing_indexed_to_schema() {
902 let schema = schema_from_event_signature(&Some(Event::parse(
903 "Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)"
904 ).unwrap())).unwrap();
905
906 assert_eq!(
907 schema,
908 Schema::from(vec![
909 Field::new("sender", DataType::BinaryView, true),
910 Field::new("to", DataType::BinaryView, true),
911 Field::new("amount0In", DataType::BinaryView, true),
912 Field::new("amount1In", DataType::BinaryView, true),
913 Field::new("amount0Out", DataType::BinaryView, true),
914 Field::new("amount1Out", DataType::BinaryView, true),
915 ])
916 );
917 }
918}