datafusion_ethers/convert/
decoded.rs

1use alloy::dyn_abi::{DecodedEvent, DynSolEvent, DynSolType, DynSolValue, Specifier};
2use alloy::json_abi::{Event, EventParam};
3use alloy::primitives::Sign;
4use alloy::rpc::types::eth::Log;
5use datafusion::arrow::array::{self, Array, ArrayBuilder, RecordBatch};
6use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7use std::sync::Arc;
8
9use super::{AppendError, Transcoder};
10
11///////////////////////////////////////////////////////////////////////////////////////////////////
12
13/// Transcodes decoded Ethereum log events into Arrow record batches
14pub struct EthDecodedLogsToArrow {
15    schema: SchemaRef,
16    event_decoder: DynSolEvent,
17    /// Indexed fields go first, then data fields, corresponding to how [DecodedEvent] stores fields
18    field_builders: Vec<Box<dyn SolidityArrayBuilder + Send>>,
19}
20
21impl EthDecodedLogsToArrow {
22    pub fn new(event_type: &Event) -> Self {
23        let resolved_type = event_type.resolve().unwrap();
24
25        let mut fields = Vec::new();
26        let mut field_builders = Vec::new();
27
28        for (typ, param) in resolved_type
29            .indexed()
30            .iter()
31            .chain(resolved_type.body().iter())
32            .zip(
33                event_type
34                    .inputs
35                    .iter()
36                    .filter(|f| f.indexed)
37                    .chain(event_type.inputs.iter().filter(|f| !f.indexed)),
38            )
39        {
40            let (field, builder) = Self::event_param_to_field(param, typ);
41            fields.push(field);
42            field_builders.push(builder);
43        }
44
45        Self {
46            schema: Arc::new(Schema::new(fields)),
47            event_decoder: resolved_type,
48            field_builders,
49        }
50    }
51
52    pub fn new_from_signature(signature: &str) -> Result<Self, alloy::dyn_abi::parser::Error> {
53        let event_type = alloy::json_abi::Event::parse(signature)?;
54        Ok(Self::new(&event_type))
55    }
56
57    pub fn push_decoded(&mut self, log: &DecodedEvent) {
58        for (val, builder) in log
59            .indexed
60            .iter()
61            .chain(log.body.iter())
62            .zip(self.field_builders.iter_mut())
63        {
64            builder.append_value(val);
65        }
66    }
67
68    fn event_param_to_field(
69        param: &EventParam,
70        typ: &DynSolType,
71    ) -> (Field, Box<dyn SolidityArrayBuilder + Send>) {
72        match typ {
73            DynSolType::Bool => (
74                Field::new(&param.name, DataType::Boolean, false),
75                Box::<SolidityArrayBuilderBool>::default(),
76            ),
77            DynSolType::Int(64) => (
78                Field::new(&param.name, DataType::Int64, false),
79                Box::<SolidityArrayBuilderInt64>::default(),
80            ),
81            DynSolType::Int(128) => (
82                Field::new(&param.name, DataType::Utf8, false),
83                Box::<SolidityArrayBuilderInt128>::default(),
84            ),
85            DynSolType::Int(256) => (
86                Field::new(&param.name, DataType::Utf8, false),
87                Box::<SolidityArrayBuilderInt256>::default(),
88            ),
89            DynSolType::Uint(64) => (
90                Field::new(&param.name, DataType::UInt64, false),
91                Box::<SolidityArrayBuilderUInt64>::default(),
92            ),
93            DynSolType::Uint(128) => (
94                Field::new(&param.name, DataType::Utf8, false),
95                Box::<SolidityArrayBuilderUInt128>::default(),
96            ),
97            DynSolType::Uint(256) => (
98                Field::new(&param.name, DataType::Utf8, false),
99                Box::<SolidityArrayBuilderUInt256>::default(),
100            ),
101            DynSolType::Address => (
102                Field::new(
103                    &param.name,
104                    // TODO: FIXME: Restore fixed-size after there's better support for it in engines
105                    // DataType::FixedSizeBinary(Address::len_bytes() as i32),
106                    DataType::Binary,
107                    false,
108                ),
109                Box::<SolidityArrayBuilderAddress>::default(),
110            ),
111            DynSolType::Bytes => (
112                Field::new(&param.name, DataType::Binary, false),
113                Box::<SolidityArrayBuilderBytes>::default(),
114            ),
115            _ => unimplemented!(
116                "Support for transcoding {typ} solidity type to arrow is not yet implemented",
117            ),
118        }
119    }
120}
121
122impl Transcoder for EthDecodedLogsToArrow {
123    fn schema(&self) -> SchemaRef {
124        self.schema.clone()
125    }
126
127    fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
128        for log in logs {
129            let decoded = self.event_decoder.decode_log_data(log.data())?;
130            self.push_decoded(&decoded);
131        }
132        Ok(())
133    }
134
135    fn len(&self) -> usize {
136        self.field_builders[0].len()
137    }
138
139    fn finish(&mut self) -> RecordBatch {
140        let columns = self.field_builders.iter_mut().map(|b| b.finish()).collect();
141        RecordBatch::try_new(self.schema.clone(), columns).unwrap()
142    }
143}
144
145trait SolidityArrayBuilder {
146    fn append_value(&mut self, value: &DynSolValue);
147    fn len(&self) -> usize;
148    fn finish(&mut self) -> Arc<dyn Array>;
149}
150
151///////////////////////////////////////////////////////////////////////////////////////////////////
152// Builders
153///////////////////////////////////////////////////////////////////////////////////////////////////
154
155#[derive(Default)]
156struct SolidityArrayBuilderBool {
157    builder: array::BooleanBuilder,
158}
159
160impl SolidityArrayBuilder for SolidityArrayBuilderBool {
161    fn append_value(&mut self, value: &DynSolValue) {
162        match value {
163            DynSolValue::Bool(v) => self.builder.append_value(*v),
164            _ => panic!("Unexpected value {value:?}"),
165        }
166    }
167    fn len(&self) -> usize {
168        self.builder.len()
169    }
170    fn finish(&mut self) -> Arc<dyn Array> {
171        Arc::new(self.builder.finish())
172    }
173}
174
175///////////////////////////////////////////////////////////////////////////////////////////////////
176
177#[derive(Default)]
178struct SolidityArrayBuilderInt64 {
179    builder: array::Int64Builder,
180}
181
182impl SolidityArrayBuilder for SolidityArrayBuilderInt64 {
183    fn append_value(&mut self, value: &DynSolValue) {
184        match value {
185            DynSolValue::Int(v, 64) => {
186                let (sign, abs) = v.into_sign_and_abs();
187                let v = match sign {
188                    Sign::Positive => abs.as_limbs()[0] as i64,
189                    Sign::Negative => -(abs.as_limbs()[0] as i64),
190                };
191                self.builder.append_value(v);
192            }
193            _ => panic!("Unexpected value {value:?}"),
194        }
195    }
196    fn len(&self) -> usize {
197        self.builder.len()
198    }
199    fn finish(&mut self) -> Arc<dyn Array> {
200        Arc::new(self.builder.finish())
201    }
202}
203
204#[derive(Default)]
205struct SolidityArrayBuilderUInt64 {
206    builder: array::UInt64Builder,
207}
208
209impl SolidityArrayBuilder for SolidityArrayBuilderUInt64 {
210    fn append_value(&mut self, value: &DynSolValue) {
211        match value {
212            DynSolValue::Uint(v, 64) => self.builder.append_value(v.as_limbs()[0]),
213            _ => panic!("Unexpected value {value:?}"),
214        }
215    }
216    fn len(&self) -> usize {
217        self.builder.len()
218    }
219    fn finish(&mut self) -> Arc<dyn Array> {
220        Arc::new(self.builder.finish())
221    }
222}
223
224///////////////////////////////////////////////////////////////////////////////////////////////////
225
226#[derive(Default)]
227struct SolidityArrayBuilderInt128 {
228    builder: array::StringBuilder,
229}
230
231impl SolidityArrayBuilder for SolidityArrayBuilderInt128 {
232    fn append_value(&mut self, value: &DynSolValue) {
233        match value {
234            DynSolValue::Int(v, 128) => self.builder.append_value(v.to_string()),
235            _ => panic!("Unexpected value {value:?}"),
236        }
237    }
238    fn len(&self) -> usize {
239        self.builder.len()
240    }
241    fn finish(&mut self) -> Arc<dyn Array> {
242        Arc::new(self.builder.finish())
243    }
244}
245
246#[derive(Default)]
247struct SolidityArrayBuilderUInt128 {
248    builder: array::StringBuilder,
249}
250
251impl SolidityArrayBuilder for SolidityArrayBuilderUInt128 {
252    fn append_value(&mut self, value: &DynSolValue) {
253        match value {
254            DynSolValue::Uint(v, 128) => self.builder.append_value(v.to_string()),
255            _ => panic!("Unexpected value {value:?}"),
256        }
257    }
258    fn len(&self) -> usize {
259        self.builder.len()
260    }
261    fn finish(&mut self) -> Arc<dyn Array> {
262        Arc::new(self.builder.finish())
263    }
264}
265
266///////////////////////////////////////////////////////////////////////////////////////////////////
267
268#[derive(Default)]
269struct SolidityArrayBuilderInt256 {
270    builder: array::StringBuilder,
271}
272
273impl SolidityArrayBuilder for SolidityArrayBuilderInt256 {
274    fn append_value(&mut self, value: &DynSolValue) {
275        match value {
276            DynSolValue::Int(v, 256) => self.builder.append_value(v.to_string()),
277            _ => panic!("Unexpected value {value:?}"),
278        }
279    }
280    fn len(&self) -> usize {
281        self.builder.len()
282    }
283    fn finish(&mut self) -> Arc<dyn Array> {
284        Arc::new(self.builder.finish())
285    }
286}
287
288#[derive(Default)]
289struct SolidityArrayBuilderUInt256 {
290    builder: array::StringBuilder,
291}
292
293impl SolidityArrayBuilder for SolidityArrayBuilderUInt256 {
294    fn append_value(&mut self, value: &DynSolValue) {
295        match value {
296            DynSolValue::Uint(v, 256) => self.builder.append_value(v.to_string()),
297            _ => panic!("Unexpected value {value:?}"),
298        }
299    }
300    fn len(&self) -> usize {
301        self.builder.len()
302    }
303    fn finish(&mut self) -> Arc<dyn Array> {
304        Arc::new(self.builder.finish())
305    }
306}
307
308///////////////////////////////////////////////////////////////////////////////////////////////////
309
310#[derive(Default)]
311struct SolidityArrayBuilderAddress {
312    builder: array::BinaryBuilder,
313}
314
315impl SolidityArrayBuilder for SolidityArrayBuilderAddress {
316    fn append_value(&mut self, value: &DynSolValue) {
317        match value {
318            DynSolValue::Address(v) => self.builder.append_value(v.as_slice()),
319            _ => panic!("Unexpected value {value:?}"),
320        }
321    }
322    fn len(&self) -> usize {
323        self.builder.len()
324    }
325    fn finish(&mut self) -> Arc<dyn Array> {
326        Arc::new(self.builder.finish())
327    }
328}
329
330///////////////////////////////////////////////////////////////////////////////////////////////////
331
332#[derive(Default)]
333struct SolidityArrayBuilderBytes {
334    builder: array::BinaryBuilder,
335}
336
337impl SolidityArrayBuilder for SolidityArrayBuilderBytes {
338    fn append_value(&mut self, value: &DynSolValue) {
339        match value {
340            DynSolValue::Bytes(v) => self.builder.append_value(v),
341            _ => panic!("Unexpected value {value:?}"),
342        }
343    }
344    fn len(&self) -> usize {
345        self.builder.len()
346    }
347    fn finish(&mut self) -> Arc<dyn Array> {
348        Arc::new(self.builder.finish())
349    }
350}