datafusion_ethers/convert/
hybrid.rs1use alloy::dyn_abi::{DynSolEvent, Specifier};
2use alloy::json_abi::Event;
3use alloy::rpc::types::eth::Log;
4use datafusion::arrow::array::RecordBatch;
5use datafusion::arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
6use std::sync::Arc;
7
8use crate::stream::StreamOptions;
9
10use super::{AppendError, Transcoder};
11
12pub struct EthRawAndDecodedLogsToArrow {
17 schema: SchemaRef,
18 raw: super::EthRawLogsToArrow,
19 decoded: super::EthDecodedLogsToArrow,
20}
21
22impl EthRawAndDecodedLogsToArrow {
23 pub fn new(options: &StreamOptions, event_type: Event, resolved_type: DynSolEvent) -> Self {
24 let raw = super::EthRawLogsToArrow::new(options);
25 let decoded = super::EthDecodedLogsToArrow::new(event_type, resolved_type);
26
27 let mut builder = SchemaBuilder::from(&raw.schema().fields);
28 builder.push(Field::new(
29 "event",
30 DataType::Struct(decoded.schema().fields.clone()),
31 false,
32 ));
33 Self {
34 schema: Arc::new(builder.finish()),
35 raw,
36 decoded,
37 }
38 }
39
40 pub fn new_from_signature(
41 options: &StreamOptions,
42 signature: &str,
43 ) -> Result<Self, alloy::dyn_abi::Error> {
44 let event_type = alloy::json_abi::Event::parse(signature)?;
45 let resolved_type = event_type.resolve()?;
46 Ok(Self::new(options, event_type, resolved_type))
47 }
48}
49
50impl Transcoder for EthRawAndDecodedLogsToArrow {
51 fn schema(&self) -> SchemaRef {
52 self.schema.clone()
53 }
54
55 #[allow(clippy::get_first)]
56 fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
57 self.raw.append(logs)?;
58 self.decoded.append(logs)?;
59 Ok(())
60 }
61
62 fn len(&self) -> usize {
63 self.raw.len()
64 }
65
66 fn finish(&mut self) -> RecordBatch {
67 let raw = self.raw.finish();
68 let decoded = self.decoded.finish();
69
70 let event_col = datafusion::arrow::array::StructArray::new(
71 decoded.schema().fields().clone(),
72 decoded.columns().to_vec(),
73 None,
74 );
75
76 let mut columns = raw.columns().to_vec();
77 columns.push(Arc::new(event_col));
78
79 RecordBatch::try_new(self.schema.clone(), columns).unwrap()
80 }
81}
82
83