skar_client/
parquet_out.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    path::PathBuf,
4    sync::Arc,
5    time::Instant,
6};
7
8use alloy_dyn_abi::{DynSolType, DynSolValue, Specifier};
9use alloy_json_abi::EventParam;
10use anyhow::{anyhow, Context, Result};
11use polars_arrow::{
12    array::{Array, BinaryViewArray, MutableArray, MutableBooleanArray, Utf8ViewArray},
13    datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field},
14};
15use polars_arrow::{
16    array::{ArrayFromIter, BinaryArray, MutableBinaryViewArray, Utf8Array},
17    legacy::error::PolarsError,
18};
19use polars_parquet::parquet::write::FileStreamer;
20use polars_parquet::{
21    read::ParquetError,
22    write::{
23        array_to_columns, to_parquet_schema, to_parquet_type, transverse, CompressedPage, DynIter,
24        DynStreamingIterator, Encoding, FallibleStreamingIterator, RowGroupIter, WriteOptions,
25    },
26};
27use rayon::prelude::*;
28use skar_net_types::Query;
29use skar_schema::{concat_chunks, empty_chunk};
30use tokio::{sync::mpsc, task::JoinHandle};
31use tokio_util::compat::TokioAsyncReadCompatExt;
32
33use crate::{
34    column_mapping, rayon_async, types::StreamConfig, ArrowBatch, ArrowChunk, Client, ParquetConfig,
35};
36
37pub async fn create_parquet_folder(
38    client: &Client,
39    query: Query,
40    config: ParquetConfig,
41) -> Result<()> {
42    let path = PathBuf::from(config.path);
43
44    tokio::fs::create_dir_all(&path)
45        .await
46        .context("create parquet dir")?;
47
48    let mut blocks_path = path.clone();
49    blocks_path.push("blocks.parquet");
50    let (mut blocks_sender, blocks_join) =
51        spawn_writer(blocks_path, &config.column_mapping.block, config.hex_output)?;
52
53    let mut transactions_path = path.clone();
54    transactions_path.push("transactions.parquet");
55    let (mut transactions_sender, transactions_join) = spawn_writer(
56        transactions_path,
57        &config.column_mapping.transaction,
58        config.hex_output,
59    )?;
60
61    let mut logs_path = path.clone();
62    logs_path.push("logs.parquet");
63    let (mut logs_sender, logs_join) =
64        spawn_writer(logs_path, &config.column_mapping.log, config.hex_output)?;
65
66    let mut traces_path = path.clone();
67    traces_path.push("traces.parquet");
68    let (mut traces_sender, traces_join) =
69        spawn_writer(traces_path, &config.column_mapping.trace, config.hex_output)?;
70
71    let event_signature = match &config.event_signature {
72        Some(sig) => Some(alloy_json_abi::Event::parse(sig).context("parse event signature")?),
73        None => None,
74    };
75
76    let mut decoded_logs_path = path.clone();
77    decoded_logs_path.push("decoded_logs.parquet");
78    let (mut decoded_logs_sender, decoded_logs_join) = spawn_writer(
79        decoded_logs_path,
80        &config.column_mapping.decoded_log,
81        config.hex_output,
82    )?;
83
84    let mut rx = client
85        .stream::<crate::ArrowIpc>(
86            query,
87            StreamConfig {
88                concurrency: config.concurrency,
89                batch_size: config.batch_size,
90                retry: config.retry,
91            },
92        )
93        .await
94        .context("start stream")?;
95
96    while let Some(resp) = rx.recv().await {
97        let resp = resp.context("get query response")?;
98
99        log::trace!("got data up to block {}", resp.next_block);
100
101        let blocks_fut = async move {
102            for batch in resp.data.blocks {
103                let batch = map_batch_to_binary_view(batch);
104                blocks_sender
105                    .send(batch)
106                    .await
107                    .context("write blocks chunk to parquet")?;
108            }
109
110            Ok::<_, anyhow::Error>(blocks_sender)
111        };
112
113        let txs_fut = async move {
114            for batch in resp.data.transactions {
115                let batch = map_batch_to_binary_view(batch);
116                transactions_sender
117                    .send(batch)
118                    .await
119                    .context("write transactions chunk to parquet")?;
120            }
121
122            Ok::<_, anyhow::Error>(transactions_sender)
123        };
124
125        let logs_fut = {
126            let data = resp.data.logs.clone();
127            async move {
128                for batch in data {
129                    let batch = map_batch_to_binary_view(batch);
130                    logs_sender
131                        .send(batch)
132                        .await
133                        .context("write logs chunk to parquet")?;
134                }
135
136                Ok::<_, anyhow::Error>(logs_sender)
137            }
138        };
139
140        let traces_fut = async move {
141            for batch in resp.data.traces {
142                let batch = map_batch_to_binary_view(batch);
143                traces_sender
144                    .send(batch)
145                    .await
146                    .context("write traces chunk to parquet")?;
147            }
148
149            Ok::<_, anyhow::Error>(traces_sender)
150        };
151
152        let sig = Arc::new(event_signature.clone());
153        let decoded_logs_fut = async move {
154            for batch in resp.data.logs {
155                let sig = sig.clone();
156                let batch = map_batch_to_binary_view(batch);
157                let batch = rayon_async::spawn(move || decode_logs_batch(&sig, batch))
158                    .await
159                    .context("join decode logs task")?
160                    .context("decode logs")?;
161
162                decoded_logs_sender
163                    .send(batch)
164                    .await
165                    .context("write decoded_logs chunk to parquet")?;
166            }
167
168            Ok::<_, anyhow::Error>(decoded_logs_sender)
169        };
170
171        let start = Instant::now();
172
173        (
174            blocks_sender,
175            transactions_sender,
176            logs_sender,
177            traces_sender,
178            decoded_logs_sender,
179        ) = futures::future::try_join5(blocks_fut, txs_fut, logs_fut, traces_fut, decoded_logs_fut)
180            .await
181            .context("write to parquet")?;
182
183        log::trace!("wrote to parquet in {} ms", start.elapsed().as_millis());
184    }
185
186    std::mem::drop(blocks_sender);
187    std::mem::drop(transactions_sender);
188    std::mem::drop(logs_sender);
189    std::mem::drop(traces_sender);
190    std::mem::drop(decoded_logs_sender);
191
192    blocks_join
193        .await
194        .context("join blocks task")?
195        .context("finish blocks file")?;
196    transactions_join
197        .await
198        .context("join transactions task")?
199        .context("finish transactions file")?;
200    logs_join
201        .await
202        .context("join logs task")?
203        .context("finish logs file")?;
204    traces_join
205        .await
206        .context("join traces task")?
207        .context("finish traces file")?;
208    decoded_logs_join
209        .await
210        .context("join decoded_logs task")?
211        .context("finish decoded_logs file")?;
212
213    Ok(())
214}
215
216fn hex_encode_chunk(chunk: &ArrowChunk) -> anyhow::Result<ArrowChunk> {
217    let cols = chunk
218        .columns()
219        .par_iter()
220        .map(|col| {
221            let col = match col.data_type() {
222                DataType::BinaryView => Box::new(hex_encode(col.as_any().downcast_ref().unwrap())),
223                _ => col.clone(),
224            };
225
226            Ok::<_, anyhow::Error>(col)
227        })
228        .collect::<Result<Vec<_>>>()?;
229
230    Ok(ArrowChunk::new(cols))
231}
232
233fn hex_encode(input: &BinaryViewArray) -> Utf8ViewArray {
234    let mut arr = MutableBinaryViewArray::<str>::new();
235
236    for buf in input.iter() {
237        arr.push(buf.map(faster_hex::hex_string));
238    }
239
240    arr.into()
241}
242
243fn spawn_writer(
244    path: PathBuf,
245    mapping: &BTreeMap<String, column_mapping::DataType>,
246    hex_output: bool,
247) -> Result<(mpsc::Sender<ArrowBatch>, JoinHandle<Result<()>>)> {
248    let (tx, rx) = mpsc::channel(64);
249
250    let mapping = Arc::new(mapping.clone());
251
252    let handle = tokio::task::spawn(async move {
253        match run_writer(rx, path, mapping, hex_output).await {
254            Ok(v) => Ok(v),
255            Err(e) => {
256                log::error!("failed to run parquet writer: {:?}", e);
257                Err(e)
258            }
259        }
260    });
261
262    Ok((tx, handle))
263}
264
265async fn run_writer(
266    mut rx: mpsc::Receiver<ArrowBatch>,
267    path: PathBuf,
268    mapping: Arc<BTreeMap<String, crate::DataType>>,
269    hex_output: bool,
270) -> Result<()> {
271    let make_writer = move |schema: &Schema| {
272        let schema = schema.clone();
273        let path = path.clone();
274        async move {
275            let write_options = polars_parquet::parquet::write::WriteOptions {
276                write_statistics: true,
277                version: polars_parquet::parquet::write::Version::V2,
278            };
279
280            let file = tokio::fs::File::create(&path)
281                .await
282                .context("create parquet file")?
283                .compat();
284
285            let parquet_schema = to_parquet_schema(&schema).context("to parquet schema")?;
286
287            let writer = FileStreamer::new(file, parquet_schema, write_options, None);
288
289            Ok::<_, anyhow::Error>(writer)
290        }
291    };
292
293    let mut writer = None;
294
295    let num_cpus = num_cpus::get();
296    let mut encode_jobs = VecDeque::<EncodeFut>::with_capacity(num_cpus);
297
298    let mut data = Vec::new();
299    let mut total_rows = 0;
300    loop {
301        let mut stop = false;
302        if let Some(batch) = rx.recv().await {
303            total_rows += batch.chunk.len();
304            data.push(batch);
305        } else {
306            stop = true;
307        }
308
309        if !data.is_empty() && (stop || total_rows >= ROW_GROUP_MAX_ROWS) {
310            let batches = std::mem::take(&mut data);
311            if encode_jobs.len() >= num_cpus {
312                let fut = encode_jobs.pop_front().unwrap();
313                let (rg, schema) = fut
314                    .await
315                    .context("join prepare task")?
316                    .context("prepare row group")?;
317                if writer.is_none() {
318                    writer = Some(make_writer(&schema).await.context("create writer")?);
319                }
320                writer
321                    .as_mut()
322                    .unwrap()
323                    .write(rg)
324                    .await
325                    .context("write encoded row group to file")?;
326            }
327
328            total_rows = 0;
329            let schema = batches[0].schema.clone();
330            let chunks = batches.into_iter().map(|b| b.chunk).collect::<Vec<_>>();
331
332            let chunk = concat_chunks(chunks.as_slice()).context("concat chunks")?;
333            let mapping = mapping.clone();
334            let fut = rayon_async::spawn(move || {
335                let field_names = schema
336                    .fields
337                    .iter()
338                    .map(|f| f.name.as_str())
339                    .collect::<Vec<&str>>();
340                let chunk = column_mapping::apply_to_chunk(&chunk, &field_names, &mapping)
341                    .context("apply column mapping to batch")?;
342
343                let chunk = if hex_output {
344                    hex_encode_chunk(&chunk).context("hex encode batch")?
345                } else {
346                    chunk
347                };
348                let chunk = Arc::new(chunk);
349
350                let schema = chunk
351                    .iter()
352                    .zip(schema.fields.iter())
353                    .map(|(col, field)| {
354                        Field::new(
355                            field.name.clone(),
356                            col.data_type().clone(),
357                            field.is_nullable,
358                        )
359                    })
360                    .collect::<Vec<_>>();
361                let schema = Arc::new(Schema::from(schema));
362
363                let rg = encode_row_group(
364                    ArrowBatch {
365                        chunk,
366                        schema: schema.clone(),
367                    },
368                    WriteOptions {
369                        write_statistics: true,
370                        version: polars_parquet::write::Version::V2,
371                        compression: polars_parquet::write::CompressionOptions::Lz4Raw,
372                        data_pagesize_limit: None,
373                    },
374                )
375                .context("encode row group")?;
376
377                Ok((rg, schema))
378            });
379
380            encode_jobs.push_back(fut);
381        }
382
383        if stop {
384            break;
385        }
386    }
387
388    while let Some(fut) = encode_jobs.pop_front() {
389        let (rg, schema) = fut
390            .await
391            .context("join prepare task")?
392            .context("prepare row group")?;
393        if writer.is_none() {
394            writer = Some(make_writer(&schema).await.context("create writer")?);
395        }
396        writer
397            .as_mut()
398            .unwrap()
399            .write(rg)
400            .await
401            .context("write encoded row group to file")?;
402    }
403
404    if let Some(writer) = writer.as_mut() {
405        let _size = writer.end(None).await.context("write footer")?;
406    }
407
408    Ok(())
409}
410
411type EncodeFut = tokio::sync::oneshot::Receiver<
412    Result<(
413        DynIter<
414            'static,
415            std::result::Result<
416                DynStreamingIterator<'static, CompressedPage, PolarsError>,
417                PolarsError,
418            >,
419        >,
420        Arc<Schema>,
421    )>,
422>;
423
424fn encode_row_group(
425    batch: ArrowBatch,
426    write_options: WriteOptions,
427) -> Result<RowGroupIter<'static, PolarsError>> {
428    let fields = batch
429        .schema
430        .fields
431        .iter()
432        .map(|field| to_parquet_type(field).context("map to parquet field"))
433        .collect::<Result<Vec<_>>>()?;
434    let encodings = batch
435        .schema
436        .fields
437        .iter()
438        .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
439        .collect::<Vec<_>>();
440
441    let data = batch
442        .chunk
443        .arrays()
444        .iter()
445        .zip(fields)
446        .zip(encodings)
447        .flat_map(move |((array, type_), encoding)| {
448            let encoded_columns = array_to_columns(array, type_, write_options, &encoding).unwrap();
449            encoded_columns
450                .into_iter()
451                .map(|encoded_pages| {
452                    let pages = encoded_pages;
453
454                    let pages = DynIter::new(
455                        pages
456                            .into_iter()
457                            .map(|x| x.map_err(|e| ParquetError::OutOfSpec(e.to_string()))),
458                    );
459
460                    let compressed_pages = pages
461                        .map(|page| {
462                            let page = page?;
463                            polars_parquet::write::compress(page, vec![], write_options.compression)
464                                .map_err(PolarsError::from)
465                        })
466                        .collect::<Vec<_>>();
467
468                    Ok(DynStreamingIterator::new(CompressedPageIter {
469                        data: compressed_pages.into_iter(),
470                        current: None,
471                    }))
472                })
473                .collect::<Vec<_>>()
474        })
475        .collect::<Vec<_>>();
476    Ok(DynIter::new(data.into_iter()))
477}
478
479struct CompressedPageIter {
480    data: std::vec::IntoIter<std::result::Result<CompressedPage, PolarsError>>,
481    current: Option<CompressedPage>,
482}
483
484impl FallibleStreamingIterator for CompressedPageIter {
485    type Item = CompressedPage;
486    type Error = PolarsError;
487
488    fn get(&self) -> Option<&Self::Item> {
489        self.current.as_ref()
490    }
491
492    fn advance(&mut self) -> std::result::Result<(), Self::Error> {
493        self.current = match self.data.next() {
494            Some(page) => Some(page?),
495            None => None,
496        };
497        Ok(())
498    }
499}
500
501const ROW_GROUP_MAX_ROWS: usize = 10_000;
502
503fn decode_logs_batch(sig: &Option<alloy_json_abi::Event>, batch: ArrowBatch) -> Result<ArrowBatch> {
504    let schema =
505        schema_from_event_signature(sig).context("build arrow schema from event signature")?;
506
507    if batch.chunk.is_empty() {
508        return Ok(ArrowBatch {
509            chunk: Arc::new(empty_chunk(&schema)),
510            schema: Arc::new(schema),
511        });
512    }
513
514    let sig = match sig {
515        Some(sig) => sig,
516        None => {
517            return Ok(ArrowBatch {
518                chunk: Arc::new(empty_chunk(&schema)),
519                schema: Arc::new(schema),
520            })
521        }
522    };
523
524    let event = sig.resolve().context("resolve signature into event")?;
525
526    let topic_cols = event
527        .indexed()
528        .iter()
529        .zip(["topic1", "topic2", "topic3"].iter())
530        .map(|(decoder, topic_name)| {
531            let col = batch
532                .column::<BinaryViewArray>(topic_name)
533                .context("get column")?;
534            let col = decode_col(col, decoder).context("decode column")?;
535            Ok::<_, anyhow::Error>(col)
536        })
537        .collect::<Result<Vec<_>>>()?;
538
539    let body_cols = if event.body() == [DynSolType::Uint(256)] {
540        let data = batch
541            .column::<BinaryViewArray>("data")
542            .context("get column")?;
543        vec![decode_erc20_amount(data, &DynSolType::Uint(256)).context("decode amount column")?]
544    } else if !event.body().is_empty() {
545        let data = batch
546            .column::<BinaryViewArray>("data")
547            .context("get column")?;
548
549        let tuple_decoder = DynSolType::Tuple(event.body().to_vec());
550
551        let mut decoded_tuples = Vec::with_capacity(data.len());
552        for val in data.values_iter() {
553            let tuple = tuple_decoder
554                .abi_decode(val)
555                .context("decode body tuple")
556                .and_then(|v| {
557                    let tuple = v
558                        .as_tuple()
559                        .context("expected tuple after decoding")?
560                        .to_vec();
561
562                    if tuple.len() != event.body().len() {
563                        return Err(anyhow!(
564                            "expected tuple of length {} after decoding",
565                            event.body().len()
566                        ));
567                    }
568
569                    Ok(Some(tuple))
570                });
571
572            let tuple = match tuple {
573                Err(e) => {
574                    log::error!(
575                        "failed to decode body of a log, will write null instead. Error was: {:?}",
576                        e
577                    );
578                    None
579                }
580                Ok(v) => v,
581            };
582
583            decoded_tuples.push(tuple);
584        }
585
586        let mut decoded_cols = Vec::with_capacity(event.body().len());
587
588        for (i, ty) in event.body().iter().enumerate() {
589            decoded_cols.push(
590                decode_body_col(
591                    decoded_tuples
592                        .iter()
593                        .map(|t| t.as_ref().map(|t| t.get(i).unwrap())),
594                    ty,
595                )
596                .context("decode body column")?,
597            );
598        }
599
600        decoded_cols
601    } else {
602        Vec::new()
603    };
604
605    let mut cols = topic_cols;
606    cols.extend_from_slice(&body_cols);
607
608    let chunk = Arc::new(ArrowChunk::try_new(cols).context("create arrow chunk")?);
609
610    Ok(ArrowBatch {
611        chunk,
612        schema: Arc::new(schema),
613    })
614}
615
616fn decode_body_col<'a, I: ExactSizeIterator<Item = Option<&'a DynSolValue>>>(
617    vals: I,
618    ty: &DynSolType,
619) -> Result<Box<dyn Array>> {
620    match ty {
621        DynSolType::Bool => {
622            let mut builder = MutableBooleanArray::with_capacity(vals.len());
623
624            for val in vals {
625                let val = match val {
626                    Some(val) => val,
627                    None => {
628                        builder.push_null();
629                        continue;
630                    }
631                };
632
633                match val {
634                    DynSolValue::Bool(b) => builder.push(Some(*b)),
635                    v => {
636                        return Err(anyhow!(
637                            "unexpected output type from decode: {:?}",
638                            v.as_type()
639                        ))
640                    }
641                }
642            }
643
644            Ok(builder.as_box())
645        }
646        _ => {
647            let mut builder = MutableBinaryViewArray::<[u8]>::new();
648
649            for val in vals {
650                let val = match val {
651                    Some(val) => val,
652                    None => {
653                        builder.push_null();
654                        continue;
655                    }
656                };
657
658                match val {
659                    DynSolValue::Int(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
660                    DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
661                    DynSolValue::FixedBytes(v, _) => builder.push(Some(v)),
662                    DynSolValue::Address(v) => builder.push(Some(v)),
663                    DynSolValue::Bytes(v) => builder.push(Some(v)),
664                    DynSolValue::String(v) => builder.push(Some(v)),
665                    v => {
666                        return Err(anyhow!(
667                            "unexpected output type from decode: {:?}",
668                            v.as_type()
669                        ))
670                    }
671                }
672            }
673
674            Ok(builder.as_box())
675        }
676    }
677}
678
679fn decode_erc20_amount(data: &BinaryViewArray, decoder: &DynSolType) -> Result<Box<dyn Array>> {
680    let mut builder = MutableBinaryViewArray::<[u8]>::new();
681
682    for val in data.values_iter() {
683        // Check if we are decoding a single u256 and the body is empty
684        //
685        // This case can happen when decoding zero value erc20 transfers
686        let v = if val.is_empty() {
687            [0; 32].as_slice()
688        } else {
689            val
690        };
691
692        match decoder.abi_decode(v).context("decode val")? {
693            DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
694            v => {
695                return Err(anyhow!(
696                    "unexpected output type from decode: {:?}",
697                    v.as_type()
698                ))
699            }
700        }
701    }
702
703    Ok(builder.as_box())
704}
705
706fn decode_col(col: &BinaryViewArray, decoder: &DynSolType) -> Result<Box<dyn Array>> {
707    match decoder {
708        DynSolType::Bool => {
709            let mut builder = MutableBooleanArray::with_capacity(col.len());
710
711            for val in col.iter() {
712                let val = match val {
713                    Some(val) => val,
714                    None => {
715                        builder.push_null();
716                        continue;
717                    }
718                };
719                match decoder.abi_decode(val).context("decode sol value")? {
720                    DynSolValue::Bool(b) => builder.push(Some(b)),
721                    v => {
722                        return Err(anyhow!(
723                            "unexpected output type from decode: {:?}",
724                            v.as_type()
725                        ))
726                    }
727                }
728            }
729
730            Ok(builder.as_box())
731        }
732        _ => {
733            let mut builder = MutableBinaryViewArray::<[u8]>::new();
734
735            for val in col.iter() {
736                let val = match val {
737                    Some(val) => val,
738                    None => {
739                        builder.push_null();
740                        continue;
741                    }
742                };
743
744                match decoder.abi_decode(val).context("decode sol value")? {
745                    DynSolValue::Int(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
746                    DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())),
747                    DynSolValue::FixedBytes(v, _) => builder.push(Some(v)),
748                    DynSolValue::Address(v) => builder.push(Some(v)),
749                    DynSolValue::Bytes(v) => builder.push(Some(v)),
750                    DynSolValue::String(v) => builder.push(Some(v)),
751                    v => {
752                        return Err(anyhow!(
753                            "unexpected output type from decode: {:?}",
754                            v.as_type()
755                        ))
756                    }
757                }
758            }
759
760            Ok(builder.as_box())
761        }
762    }
763}
764
765fn schema_from_event_signature(sig: &Option<alloy_json_abi::Event>) -> Result<Schema> {
766    let sig = match sig {
767        Some(sig) => sig,
768        None => {
769            return Ok(Schema::from(vec![Field::new(
770                "dummy",
771                DataType::Boolean,
772                true,
773            )]));
774        }
775    };
776
777    let event = sig.resolve().context("resolve signature into event")?;
778
779    let mut fields: Vec<Field> = Vec::with_capacity(sig.inputs.len());
780
781    for (input, resolved_type) in sig
782        .inputs
783        .iter()
784        .filter(|i| i.indexed)
785        .zip(event.indexed().iter())
786    {
787        fields.push(process_input(&fields, input, resolved_type).context("process input")?);
788    }
789
790    for (input, resolved_type) in sig
791        .inputs
792        .iter()
793        .filter(|i| !i.indexed)
794        .zip(event.body().iter())
795    {
796        fields.push(process_input(&fields, input, resolved_type).context("process input")?);
797    }
798
799    Ok(Schema::from(fields))
800}
801
802fn process_input(
803    fields: &[Field],
804    input: &EventParam,
805    resolved_type: &DynSolType,
806) -> Result<Field> {
807    if input.name.is_empty() {
808        return Err(anyhow!("empty param names are not supported"));
809    }
810
811    if fields
812        .iter()
813        .any(|f| f.name.as_str() == input.name.as_str())
814    {
815        return Err(anyhow!("duplicate param name: {}", input.name));
816    }
817
818    let ty = DynSolType::parse(&input.ty).context("parse solidity type")?;
819
820    if &ty != resolved_type {
821        return Err(anyhow!(
822            "Internal error: Parsed type doesn't match resolved type. This should never happen."
823        ));
824    }
825
826    let dt = simple_type_to_data_type(&ty).context("convert simple type to arrow datatype")?;
827
828    Ok(Field::new(input.name.clone(), dt, true))
829}
830
831fn simple_type_to_data_type(ty: &DynSolType) -> Result<DataType> {
832    match ty {
833        DynSolType::Bool => Ok(DataType::Boolean),
834        DynSolType::Int(_) => Ok(DataType::BinaryView),
835        DynSolType::Uint(_) => Ok(DataType::BinaryView),
836        DynSolType::FixedBytes(_) => Ok(DataType::BinaryView),
837        DynSolType::Address => Ok(DataType::BinaryView),
838        DynSolType::Bytes => Ok(DataType::BinaryView),
839        DynSolType::String => Ok(DataType::BinaryView),
840        ty => Err(anyhow!(
841            "Complex types are not supported. Unexpected type: {}",
842            ty
843        )),
844    }
845}
846
847pub fn map_batch_to_binary_view(batch: ArrowBatch) -> ArrowBatch {
848    let cols = batch
849        .chunk
850        .arrays()
851        .iter()
852        .map(|col| match col.data_type() {
853            DataType::Binary => BinaryViewArray::arr_from_iter(
854                col.as_any()
855                    .downcast_ref::<BinaryArray<i32>>()
856                    .unwrap()
857                    .iter(),
858            )
859            .boxed(),
860            DataType::Utf8 => Utf8ViewArray::arr_from_iter(
861                col.as_any()
862                    .downcast_ref::<Utf8Array<i32>>()
863                    .unwrap()
864                    .iter(),
865            )
866            .boxed(),
867            _ => col.clone(),
868        })
869        .collect::<Vec<_>>();
870
871    let fields = cols
872        .iter()
873        .zip(batch.schema.fields.iter())
874        .map(|(col, field)| {
875            Field::new(
876                field.name.clone(),
877                col.data_type().clone(),
878                field.is_nullable,
879            )
880        })
881        .collect::<Vec<_>>();
882
883    let schema = Schema {
884        fields,
885        metadata: Default::default(),
886    };
887
888    ArrowBatch {
889        chunk: Arc::new(ArrowChunk::new(cols)),
890        schema: Arc::new(schema),
891    }
892}
893
894#[cfg(test)]
895mod tests {
896    use alloy_json_abi::Event;
897
898    use super::*;
899
900    #[test]
901    fn test_trailing_indexed_to_schema() {
902        let schema = schema_from_event_signature(&Some(Event::parse(
903            "Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)"
904        ).unwrap())).unwrap();
905
906        assert_eq!(
907            schema,
908            Schema::from(vec![
909                Field::new("sender", DataType::BinaryView, true),
910                Field::new("to", DataType::BinaryView, true),
911                Field::new("amount0In", DataType::BinaryView, true),
912                Field::new("amount1In", DataType::BinaryView, true),
913                Field::new("amount0Out", DataType::BinaryView, true),
914                Field::new("amount1Out", DataType::BinaryView, true),
915            ])
916        );
917    }
918}