datafusion_ethers/convert/
hybrid.rs

1use alloy::dyn_abi::{DynSolEvent, Specifier};
2use alloy::json_abi::Event;
3use alloy::rpc::types::eth::Log;
4use datafusion::arrow::array::RecordBatch;
5use datafusion::arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
6use std::sync::Arc;
7
8use crate::stream::StreamOptions;
9
10use super::{AppendError, Transcoder};
11
12///////////////////////////////////////////////////////////////////////////////////////////////////
13
14/// Transcodes raw Ethereum log events into Arrow record batches while also creating a nested
15/// struct with fields decoded according to the provided log signature
16pub struct EthRawAndDecodedLogsToArrow {
17    schema: SchemaRef,
18    raw: super::EthRawLogsToArrow,
19    decoded: super::EthDecodedLogsToArrow,
20}
21
22impl EthRawAndDecodedLogsToArrow {
23    pub fn new(options: &StreamOptions, event_type: Event, resolved_type: DynSolEvent) -> Self {
24        let raw = super::EthRawLogsToArrow::new(options);
25        let decoded = super::EthDecodedLogsToArrow::new(event_type, resolved_type);
26
27        let mut builder = SchemaBuilder::from(&raw.schema().fields);
28        builder.push(Field::new(
29            "event",
30            DataType::Struct(decoded.schema().fields.clone()),
31            false,
32        ));
33        Self {
34            schema: Arc::new(builder.finish()),
35            raw,
36            decoded,
37        }
38    }
39
40    pub fn new_from_signature(
41        options: &StreamOptions,
42        signature: &str,
43    ) -> Result<Self, alloy::dyn_abi::Error> {
44        let event_type = alloy::json_abi::Event::parse(signature)?;
45        let resolved_type = event_type.resolve()?;
46        Ok(Self::new(options, event_type, resolved_type))
47    }
48}
49
50impl Transcoder for EthRawAndDecodedLogsToArrow {
51    fn schema(&self) -> SchemaRef {
52        self.schema.clone()
53    }
54
55    #[allow(clippy::get_first)]
56    fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
57        self.raw.append(logs)?;
58        self.decoded.append(logs)?;
59        Ok(())
60    }
61
62    fn len(&self) -> usize {
63        self.raw.len()
64    }
65
66    fn finish(&mut self) -> RecordBatch {
67        let raw = self.raw.finish();
68        let decoded = self.decoded.finish();
69
70        let event_col = datafusion::arrow::array::StructArray::new(
71            decoded.schema().fields().clone(),
72            decoded.columns().to_vec(),
73            None,
74        );
75
76        let mut columns = raw.columns().to_vec();
77        columns.push(Arc::new(event_col));
78
79        RecordBatch::try_new(self.schema.clone(), columns).unwrap()
80    }
81}
82
83///////////////////////////////////////////////////////////////////////////////////////////////////