datafusion_ethers/convert/
decoded.rs

1use alloy::dyn_abi::{DecodedEvent, DynSolEvent, DynSolType, DynSolValue, Specifier};
2use alloy::json_abi::{Event, EventParam};
3use alloy::primitives::Sign;
4use alloy::rpc::types::eth::Log;
5use datafusion::arrow::array::{self, Array, ArrayBuilder, RecordBatch};
6use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7use std::sync::Arc;
8
9use super::{AppendError, Transcoder};
10
11///////////////////////////////////////////////////////////////////////////////////////////////////
12
13/// Transcodes decoded Ethereum log events into Arrow record batches
14pub struct EthDecodedLogsToArrow {
15    schema: SchemaRef,
16    event_decoder: DynSolEvent,
17    /// Indexed fields go first, then data fields, corresponding to how [DecodedEvent] stores fields
18    field_builders: Vec<Box<dyn SolidityArrayBuilder + Send>>,
19}
20
21impl EthDecodedLogsToArrow {
22    pub fn new(event_type: Event, resolved_type: DynSolEvent) -> Self {
23        let mut fields = Vec::new();
24        let mut field_builders = Vec::new();
25
26        for (typ, param) in resolved_type
27            .indexed()
28            .iter()
29            .chain(resolved_type.body().iter())
30            .zip(
31                event_type
32                    .inputs
33                    .iter()
34                    .filter(|f| f.indexed)
35                    .chain(event_type.inputs.iter().filter(|f| !f.indexed)),
36            )
37        {
38            let (field, builder) = Self::event_param_to_field(param, typ);
39            fields.push(field);
40            field_builders.push(builder);
41        }
42
43        Self {
44            schema: Arc::new(Schema::new(fields)),
45            event_decoder: resolved_type,
46            field_builders,
47        }
48    }
49
50    pub fn new_from_signature(signature: &str) -> Result<Self, alloy::dyn_abi::Error> {
51        let event_type = alloy::json_abi::Event::parse(signature)?;
52        let resolved_type = event_type.resolve()?;
53        Ok(Self::new(event_type, resolved_type))
54    }
55
56    pub fn push_decoded(&mut self, log: &DecodedEvent) {
57        for (val, builder) in log
58            .indexed
59            .iter()
60            .chain(log.body.iter())
61            .zip(self.field_builders.iter_mut())
62        {
63            builder.append_value(val);
64        }
65    }
66
67    fn event_param_to_field(
68        param: &EventParam,
69        typ: &DynSolType,
70    ) -> (Field, Box<dyn SolidityArrayBuilder + Send>) {
71        match typ {
72            DynSolType::Bool => (
73                Field::new(&param.name, DataType::Boolean, false),
74                Box::<SolidityArrayBuilderBool>::default(),
75            ),
76            DynSolType::Int(32) => (
77                Field::new(&param.name, DataType::Int32, false),
78                Box::<SolidityArrayBuilderInt32>::default(),
79            ),
80            DynSolType::Int(64) => (
81                Field::new(&param.name, DataType::Int64, false),
82                Box::<SolidityArrayBuilderInt64>::default(),
83            ),
84            DynSolType::Int(128) => (
85                Field::new(&param.name, DataType::Utf8, false),
86                Box::<SolidityArrayBuilderInt128>::default(),
87            ),
88            DynSolType::Int(256) => (
89                Field::new(&param.name, DataType::Utf8, false),
90                Box::<SolidityArrayBuilderInt256>::default(),
91            ),
92            DynSolType::Uint(32) => (
93                Field::new(&param.name, DataType::UInt32, false),
94                Box::<SolidityArrayBuilderUInt32>::default(),
95            ),
96            DynSolType::Uint(64) => (
97                Field::new(&param.name, DataType::UInt64, false),
98                Box::<SolidityArrayBuilderUInt64>::default(),
99            ),
100            DynSolType::Uint(128) => (
101                Field::new(&param.name, DataType::Utf8, false),
102                Box::<SolidityArrayBuilderUInt128>::default(),
103            ),
104            DynSolType::Uint(256) => (
105                Field::new(&param.name, DataType::Utf8, false),
106                Box::<SolidityArrayBuilderUInt256>::default(),
107            ),
108            DynSolType::Address => (
109                Field::new(
110                    &param.name,
111                    // TODO: FIXME: Restore fixed-size after there's better support for it in engines
112                    // DataType::FixedSizeBinary(Address::len_bytes() as i32),
113                    DataType::Binary,
114                    false,
115                ),
116                Box::<SolidityArrayBuilderAddress>::default(),
117            ),
118            DynSolType::Bytes => (
119                Field::new(&param.name, DataType::Binary, false),
120                Box::<SolidityArrayBuilderBytes>::default(),
121            ),
122            DynSolType::String => (
123                Field::new(&param.name, DataType::Utf8, false),
124                Box::<SolidityArrayBuilderUtf8>::default(),
125            ),
126            _ => unimplemented!(
127                "Support for transcoding {typ} solidity type to arrow is not yet implemented",
128            ),
129        }
130    }
131}
132
133impl Transcoder for EthDecodedLogsToArrow {
134    fn schema(&self) -> SchemaRef {
135        self.schema.clone()
136    }
137
138    fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
139        for log in logs {
140            let decoded = self.event_decoder.decode_log_data(log.data())?;
141            self.push_decoded(&decoded);
142        }
143        Ok(())
144    }
145
146    fn len(&self) -> usize {
147        self.field_builders[0].len()
148    }
149
150    fn finish(&mut self) -> RecordBatch {
151        let columns = self.field_builders.iter_mut().map(|b| b.finish()).collect();
152        RecordBatch::try_new(self.schema.clone(), columns).unwrap()
153    }
154}
155
156trait SolidityArrayBuilder {
157    fn append_value(&mut self, value: &DynSolValue);
158    fn len(&self) -> usize;
159    fn finish(&mut self) -> Arc<dyn Array>;
160}
161
162///////////////////////////////////////////////////////////////////////////////////////////////////
163// Builders
164///////////////////////////////////////////////////////////////////////////////////////////////////
165
166#[derive(Default)]
167struct SolidityArrayBuilderBool {
168    builder: array::BooleanBuilder,
169}
170
171impl SolidityArrayBuilder for SolidityArrayBuilderBool {
172    fn append_value(&mut self, value: &DynSolValue) {
173        match value {
174            DynSolValue::Bool(v) => self.builder.append_value(*v),
175            _ => panic!("Unexpected value {value:?}"),
176        }
177    }
178    fn len(&self) -> usize {
179        self.builder.len()
180    }
181    fn finish(&mut self) -> Arc<dyn Array> {
182        Arc::new(self.builder.finish())
183    }
184}
185
186///////////////////////////////////////////////////////////////////////////////////////////////////
187
188#[derive(Default)]
189struct SolidityArrayBuilderInt32 {
190    builder: array::Int32Builder,
191}
192
193impl SolidityArrayBuilder for SolidityArrayBuilderInt32 {
194    fn append_value(&mut self, value: &DynSolValue) {
195        match value {
196            DynSolValue::Int(v, 32) => {
197                let (sign, abs) = v.into_sign_and_abs();
198                let v = match sign {
199                    Sign::Positive => abs.as_limbs()[0] as i32,
200                    Sign::Negative => -(abs.as_limbs()[0] as i32),
201                };
202                self.builder.append_value(v);
203            }
204            _ => panic!("Unexpected value {value:?}"),
205        }
206    }
207    fn len(&self) -> usize {
208        self.builder.len()
209    }
210    fn finish(&mut self) -> Arc<dyn Array> {
211        Arc::new(self.builder.finish())
212    }
213}
214
215#[derive(Default)]
216struct SolidityArrayBuilderUInt32 {
217    builder: array::UInt32Builder,
218}
219
220impl SolidityArrayBuilder for SolidityArrayBuilderUInt32 {
221    fn append_value(&mut self, value: &DynSolValue) {
222        match value {
223            DynSolValue::Uint(v, 32) => self.builder.append_value(v.as_limbs()[0] as u32),
224            _ => panic!("Unexpected value {value:?}"),
225        }
226    }
227    fn len(&self) -> usize {
228        self.builder.len()
229    }
230    fn finish(&mut self) -> Arc<dyn Array> {
231        Arc::new(self.builder.finish())
232    }
233}
234
235///////////////////////////////////////////////////////////////////////////////////////////////////
236
237#[derive(Default)]
238struct SolidityArrayBuilderInt64 {
239    builder: array::Int64Builder,
240}
241
242impl SolidityArrayBuilder for SolidityArrayBuilderInt64 {
243    fn append_value(&mut self, value: &DynSolValue) {
244        match value {
245            DynSolValue::Int(v, 64) => {
246                let (sign, abs) = v.into_sign_and_abs();
247                let v = match sign {
248                    Sign::Positive => abs.as_limbs()[0] as i64,
249                    Sign::Negative => -(abs.as_limbs()[0] as i64),
250                };
251                self.builder.append_value(v);
252            }
253            _ => panic!("Unexpected value {value:?}"),
254        }
255    }
256    fn len(&self) -> usize {
257        self.builder.len()
258    }
259    fn finish(&mut self) -> Arc<dyn Array> {
260        Arc::new(self.builder.finish())
261    }
262}
263
264#[derive(Default)]
265struct SolidityArrayBuilderUInt64 {
266    builder: array::UInt64Builder,
267}
268
269impl SolidityArrayBuilder for SolidityArrayBuilderUInt64 {
270    fn append_value(&mut self, value: &DynSolValue) {
271        match value {
272            DynSolValue::Uint(v, 64) => self.builder.append_value(v.as_limbs()[0]),
273            _ => panic!("Unexpected value {value:?}"),
274        }
275    }
276    fn len(&self) -> usize {
277        self.builder.len()
278    }
279    fn finish(&mut self) -> Arc<dyn Array> {
280        Arc::new(self.builder.finish())
281    }
282}
283
284///////////////////////////////////////////////////////////////////////////////////////////////////
285
286#[derive(Default)]
287struct SolidityArrayBuilderInt128 {
288    builder: array::StringBuilder,
289}
290
291impl SolidityArrayBuilder for SolidityArrayBuilderInt128 {
292    fn append_value(&mut self, value: &DynSolValue) {
293        match value {
294            DynSolValue::Int(v, 128) => self.builder.append_value(v.to_string()),
295            _ => panic!("Unexpected value {value:?}"),
296        }
297    }
298    fn len(&self) -> usize {
299        self.builder.len()
300    }
301    fn finish(&mut self) -> Arc<dyn Array> {
302        Arc::new(self.builder.finish())
303    }
304}
305
306#[derive(Default)]
307struct SolidityArrayBuilderUInt128 {
308    builder: array::StringBuilder,
309}
310
311impl SolidityArrayBuilder for SolidityArrayBuilderUInt128 {
312    fn append_value(&mut self, value: &DynSolValue) {
313        match value {
314            DynSolValue::Uint(v, 128) => self.builder.append_value(v.to_string()),
315            _ => panic!("Unexpected value {value:?}"),
316        }
317    }
318    fn len(&self) -> usize {
319        self.builder.len()
320    }
321    fn finish(&mut self) -> Arc<dyn Array> {
322        Arc::new(self.builder.finish())
323    }
324}
325
326///////////////////////////////////////////////////////////////////////////////////////////////////
327
328#[derive(Default)]
329struct SolidityArrayBuilderInt256 {
330    builder: array::StringBuilder,
331}
332
333impl SolidityArrayBuilder for SolidityArrayBuilderInt256 {
334    fn append_value(&mut self, value: &DynSolValue) {
335        match value {
336            DynSolValue::Int(v, 256) => self.builder.append_value(v.to_string()),
337            _ => panic!("Unexpected value {value:?}"),
338        }
339    }
340    fn len(&self) -> usize {
341        self.builder.len()
342    }
343    fn finish(&mut self) -> Arc<dyn Array> {
344        Arc::new(self.builder.finish())
345    }
346}
347
348#[derive(Default)]
349struct SolidityArrayBuilderUInt256 {
350    builder: array::StringBuilder,
351}
352
353impl SolidityArrayBuilder for SolidityArrayBuilderUInt256 {
354    fn append_value(&mut self, value: &DynSolValue) {
355        match value {
356            DynSolValue::Uint(v, 256) => self.builder.append_value(v.to_string()),
357            _ => panic!("Unexpected value {value:?}"),
358        }
359    }
360    fn len(&self) -> usize {
361        self.builder.len()
362    }
363    fn finish(&mut self) -> Arc<dyn Array> {
364        Arc::new(self.builder.finish())
365    }
366}
367
368///////////////////////////////////////////////////////////////////////////////////////////////////
369
370#[derive(Default)]
371struct SolidityArrayBuilderAddress {
372    builder: array::BinaryBuilder,
373}
374
375impl SolidityArrayBuilder for SolidityArrayBuilderAddress {
376    fn append_value(&mut self, value: &DynSolValue) {
377        match value {
378            DynSolValue::Address(v) => self.builder.append_value(v.as_slice()),
379            _ => panic!("Unexpected value {value:?}"),
380        }
381    }
382    fn len(&self) -> usize {
383        self.builder.len()
384    }
385    fn finish(&mut self) -> Arc<dyn Array> {
386        Arc::new(self.builder.finish())
387    }
388}
389
390///////////////////////////////////////////////////////////////////////////////////////////////////
391
392#[derive(Default)]
393struct SolidityArrayBuilderBytes {
394    builder: array::BinaryBuilder,
395}
396
397impl SolidityArrayBuilder for SolidityArrayBuilderBytes {
398    fn append_value(&mut self, value: &DynSolValue) {
399        match value {
400            DynSolValue::Bytes(v) => self.builder.append_value(v),
401            _ => panic!("Unexpected value {value:?}"),
402        }
403    }
404    fn len(&self) -> usize {
405        self.builder.len()
406    }
407    fn finish(&mut self) -> Arc<dyn Array> {
408        Arc::new(self.builder.finish())
409    }
410}
411
412///////////////////////////////////////////////////////////////////////////////////////////////////
413
414#[derive(Default)]
415struct SolidityArrayBuilderUtf8 {
416    builder: array::StringBuilder,
417}
418
419impl SolidityArrayBuilder for SolidityArrayBuilderUtf8 {
420    fn append_value(&mut self, value: &DynSolValue) {
421        match value {
422            DynSolValue::String(v) => self.builder.append_value(v),
423            _ => panic!("Unexpected value {value:?}"),
424        }
425    }
426    fn len(&self) -> usize {
427        self.builder.len()
428    }
429    fn finish(&mut self) -> Arc<dyn Array> {
430        Arc::new(self.builder.finish())
431    }
432}