datafusion_ethers/convert/
decoded.rs1use alloy::dyn_abi::{DecodedEvent, DynSolEvent, DynSolType, DynSolValue, Specifier};
2use alloy::json_abi::{Event, EventParam};
3use alloy::primitives::Sign;
4use alloy::rpc::types::eth::Log;
5use datafusion::arrow::array::{self, Array, ArrayBuilder, RecordBatch};
6use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7use std::sync::Arc;
8
9use super::{AppendError, Transcoder};
10
11pub struct EthDecodedLogsToArrow {
15 schema: SchemaRef,
16 event_decoder: DynSolEvent,
17 field_builders: Vec<Box<dyn SolidityArrayBuilder + Send>>,
19}
20
21impl EthDecodedLogsToArrow {
22 pub fn new(event_type: Event, resolved_type: DynSolEvent) -> Self {
23 let mut fields = Vec::new();
24 let mut field_builders = Vec::new();
25
26 for (typ, param) in resolved_type
27 .indexed()
28 .iter()
29 .chain(resolved_type.body().iter())
30 .zip(
31 event_type
32 .inputs
33 .iter()
34 .filter(|f| f.indexed)
35 .chain(event_type.inputs.iter().filter(|f| !f.indexed)),
36 )
37 {
38 let (field, builder) = Self::event_param_to_field(param, typ);
39 fields.push(field);
40 field_builders.push(builder);
41 }
42
43 Self {
44 schema: Arc::new(Schema::new(fields)),
45 event_decoder: resolved_type,
46 field_builders,
47 }
48 }
49
50 pub fn new_from_signature(signature: &str) -> Result<Self, alloy::dyn_abi::Error> {
51 let event_type = alloy::json_abi::Event::parse(signature)?;
52 let resolved_type = event_type.resolve()?;
53 Ok(Self::new(event_type, resolved_type))
54 }
55
56 pub fn push_decoded(&mut self, log: &DecodedEvent) {
57 for (val, builder) in log
58 .indexed
59 .iter()
60 .chain(log.body.iter())
61 .zip(self.field_builders.iter_mut())
62 {
63 builder.append_value(val);
64 }
65 }
66
67 fn event_param_to_field(
68 param: &EventParam,
69 typ: &DynSolType,
70 ) -> (Field, Box<dyn SolidityArrayBuilder + Send>) {
71 match typ {
72 DynSolType::Bool => (
73 Field::new(¶m.name, DataType::Boolean, false),
74 Box::<SolidityArrayBuilderBool>::default(),
75 ),
76 DynSolType::Int(32) => (
77 Field::new(¶m.name, DataType::Int32, false),
78 Box::<SolidityArrayBuilderInt32>::default(),
79 ),
80 DynSolType::Int(64) => (
81 Field::new(¶m.name, DataType::Int64, false),
82 Box::<SolidityArrayBuilderInt64>::default(),
83 ),
84 DynSolType::Int(128) => (
85 Field::new(¶m.name, DataType::Utf8, false),
86 Box::<SolidityArrayBuilderInt128>::default(),
87 ),
88 DynSolType::Int(256) => (
89 Field::new(¶m.name, DataType::Utf8, false),
90 Box::<SolidityArrayBuilderInt256>::default(),
91 ),
92 DynSolType::Uint(32) => (
93 Field::new(¶m.name, DataType::UInt32, false),
94 Box::<SolidityArrayBuilderUInt32>::default(),
95 ),
96 DynSolType::Uint(64) => (
97 Field::new(¶m.name, DataType::UInt64, false),
98 Box::<SolidityArrayBuilderUInt64>::default(),
99 ),
100 DynSolType::Uint(128) => (
101 Field::new(¶m.name, DataType::Utf8, false),
102 Box::<SolidityArrayBuilderUInt128>::default(),
103 ),
104 DynSolType::Uint(256) => (
105 Field::new(¶m.name, DataType::Utf8, false),
106 Box::<SolidityArrayBuilderUInt256>::default(),
107 ),
108 DynSolType::Address => (
109 Field::new(
110 ¶m.name,
111 DataType::Binary,
114 false,
115 ),
116 Box::<SolidityArrayBuilderAddress>::default(),
117 ),
118 DynSolType::Bytes => (
119 Field::new(¶m.name, DataType::Binary, false),
120 Box::<SolidityArrayBuilderBytes>::default(),
121 ),
122 DynSolType::String => (
123 Field::new(¶m.name, DataType::Utf8, false),
124 Box::<SolidityArrayBuilderUtf8>::default(),
125 ),
126 _ => unimplemented!(
127 "Support for transcoding {typ} solidity type to arrow is not yet implemented",
128 ),
129 }
130 }
131}
132
133impl Transcoder for EthDecodedLogsToArrow {
134 fn schema(&self) -> SchemaRef {
135 self.schema.clone()
136 }
137
138 fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
139 for log in logs {
140 let decoded = self.event_decoder.decode_log_data(log.data())?;
141 self.push_decoded(&decoded);
142 }
143 Ok(())
144 }
145
146 fn len(&self) -> usize {
147 self.field_builders[0].len()
148 }
149
150 fn finish(&mut self) -> RecordBatch {
151 let columns = self.field_builders.iter_mut().map(|b| b.finish()).collect();
152 RecordBatch::try_new(self.schema.clone(), columns).unwrap()
153 }
154}
155
156trait SolidityArrayBuilder {
157 fn append_value(&mut self, value: &DynSolValue);
158 fn len(&self) -> usize;
159 fn finish(&mut self) -> Arc<dyn Array>;
160}
161
162#[derive(Default)]
167struct SolidityArrayBuilderBool {
168 builder: array::BooleanBuilder,
169}
170
171impl SolidityArrayBuilder for SolidityArrayBuilderBool {
172 fn append_value(&mut self, value: &DynSolValue) {
173 match value {
174 DynSolValue::Bool(v) => self.builder.append_value(*v),
175 _ => panic!("Unexpected value {value:?}"),
176 }
177 }
178 fn len(&self) -> usize {
179 self.builder.len()
180 }
181 fn finish(&mut self) -> Arc<dyn Array> {
182 Arc::new(self.builder.finish())
183 }
184}
185
186#[derive(Default)]
189struct SolidityArrayBuilderInt32 {
190 builder: array::Int32Builder,
191}
192
193impl SolidityArrayBuilder for SolidityArrayBuilderInt32 {
194 fn append_value(&mut self, value: &DynSolValue) {
195 match value {
196 DynSolValue::Int(v, 32) => {
197 let (sign, abs) = v.into_sign_and_abs();
198 let v = match sign {
199 Sign::Positive => abs.as_limbs()[0] as i32,
200 Sign::Negative => -(abs.as_limbs()[0] as i32),
201 };
202 self.builder.append_value(v);
203 }
204 _ => panic!("Unexpected value {value:?}"),
205 }
206 }
207 fn len(&self) -> usize {
208 self.builder.len()
209 }
210 fn finish(&mut self) -> Arc<dyn Array> {
211 Arc::new(self.builder.finish())
212 }
213}
214
215#[derive(Default)]
216struct SolidityArrayBuilderUInt32 {
217 builder: array::UInt32Builder,
218}
219
220impl SolidityArrayBuilder for SolidityArrayBuilderUInt32 {
221 fn append_value(&mut self, value: &DynSolValue) {
222 match value {
223 DynSolValue::Uint(v, 32) => self.builder.append_value(v.as_limbs()[0] as u32),
224 _ => panic!("Unexpected value {value:?}"),
225 }
226 }
227 fn len(&self) -> usize {
228 self.builder.len()
229 }
230 fn finish(&mut self) -> Arc<dyn Array> {
231 Arc::new(self.builder.finish())
232 }
233}
234
235#[derive(Default)]
238struct SolidityArrayBuilderInt64 {
239 builder: array::Int64Builder,
240}
241
242impl SolidityArrayBuilder for SolidityArrayBuilderInt64 {
243 fn append_value(&mut self, value: &DynSolValue) {
244 match value {
245 DynSolValue::Int(v, 64) => {
246 let (sign, abs) = v.into_sign_and_abs();
247 let v = match sign {
248 Sign::Positive => abs.as_limbs()[0] as i64,
249 Sign::Negative => -(abs.as_limbs()[0] as i64),
250 };
251 self.builder.append_value(v);
252 }
253 _ => panic!("Unexpected value {value:?}"),
254 }
255 }
256 fn len(&self) -> usize {
257 self.builder.len()
258 }
259 fn finish(&mut self) -> Arc<dyn Array> {
260 Arc::new(self.builder.finish())
261 }
262}
263
264#[derive(Default)]
265struct SolidityArrayBuilderUInt64 {
266 builder: array::UInt64Builder,
267}
268
269impl SolidityArrayBuilder for SolidityArrayBuilderUInt64 {
270 fn append_value(&mut self, value: &DynSolValue) {
271 match value {
272 DynSolValue::Uint(v, 64) => self.builder.append_value(v.as_limbs()[0]),
273 _ => panic!("Unexpected value {value:?}"),
274 }
275 }
276 fn len(&self) -> usize {
277 self.builder.len()
278 }
279 fn finish(&mut self) -> Arc<dyn Array> {
280 Arc::new(self.builder.finish())
281 }
282}
283
284#[derive(Default)]
287struct SolidityArrayBuilderInt128 {
288 builder: array::StringBuilder,
289}
290
291impl SolidityArrayBuilder for SolidityArrayBuilderInt128 {
292 fn append_value(&mut self, value: &DynSolValue) {
293 match value {
294 DynSolValue::Int(v, 128) => self.builder.append_value(v.to_string()),
295 _ => panic!("Unexpected value {value:?}"),
296 }
297 }
298 fn len(&self) -> usize {
299 self.builder.len()
300 }
301 fn finish(&mut self) -> Arc<dyn Array> {
302 Arc::new(self.builder.finish())
303 }
304}
305
306#[derive(Default)]
307struct SolidityArrayBuilderUInt128 {
308 builder: array::StringBuilder,
309}
310
311impl SolidityArrayBuilder for SolidityArrayBuilderUInt128 {
312 fn append_value(&mut self, value: &DynSolValue) {
313 match value {
314 DynSolValue::Uint(v, 128) => self.builder.append_value(v.to_string()),
315 _ => panic!("Unexpected value {value:?}"),
316 }
317 }
318 fn len(&self) -> usize {
319 self.builder.len()
320 }
321 fn finish(&mut self) -> Arc<dyn Array> {
322 Arc::new(self.builder.finish())
323 }
324}
325
326#[derive(Default)]
329struct SolidityArrayBuilderInt256 {
330 builder: array::StringBuilder,
331}
332
333impl SolidityArrayBuilder for SolidityArrayBuilderInt256 {
334 fn append_value(&mut self, value: &DynSolValue) {
335 match value {
336 DynSolValue::Int(v, 256) => self.builder.append_value(v.to_string()),
337 _ => panic!("Unexpected value {value:?}"),
338 }
339 }
340 fn len(&self) -> usize {
341 self.builder.len()
342 }
343 fn finish(&mut self) -> Arc<dyn Array> {
344 Arc::new(self.builder.finish())
345 }
346}
347
348#[derive(Default)]
349struct SolidityArrayBuilderUInt256 {
350 builder: array::StringBuilder,
351}
352
353impl SolidityArrayBuilder for SolidityArrayBuilderUInt256 {
354 fn append_value(&mut self, value: &DynSolValue) {
355 match value {
356 DynSolValue::Uint(v, 256) => self.builder.append_value(v.to_string()),
357 _ => panic!("Unexpected value {value:?}"),
358 }
359 }
360 fn len(&self) -> usize {
361 self.builder.len()
362 }
363 fn finish(&mut self) -> Arc<dyn Array> {
364 Arc::new(self.builder.finish())
365 }
366}
367
368#[derive(Default)]
371struct SolidityArrayBuilderAddress {
372 builder: array::BinaryBuilder,
373}
374
375impl SolidityArrayBuilder for SolidityArrayBuilderAddress {
376 fn append_value(&mut self, value: &DynSolValue) {
377 match value {
378 DynSolValue::Address(v) => self.builder.append_value(v.as_slice()),
379 _ => panic!("Unexpected value {value:?}"),
380 }
381 }
382 fn len(&self) -> usize {
383 self.builder.len()
384 }
385 fn finish(&mut self) -> Arc<dyn Array> {
386 Arc::new(self.builder.finish())
387 }
388}
389
390#[derive(Default)]
393struct SolidityArrayBuilderBytes {
394 builder: array::BinaryBuilder,
395}
396
397impl SolidityArrayBuilder for SolidityArrayBuilderBytes {
398 fn append_value(&mut self, value: &DynSolValue) {
399 match value {
400 DynSolValue::Bytes(v) => self.builder.append_value(v),
401 _ => panic!("Unexpected value {value:?}"),
402 }
403 }
404 fn len(&self) -> usize {
405 self.builder.len()
406 }
407 fn finish(&mut self) -> Arc<dyn Array> {
408 Arc::new(self.builder.finish())
409 }
410}
411
412#[derive(Default)]
415struct SolidityArrayBuilderUtf8 {
416 builder: array::StringBuilder,
417}
418
419impl SolidityArrayBuilder for SolidityArrayBuilderUtf8 {
420 fn append_value(&mut self, value: &DynSolValue) {
421 match value {
422 DynSolValue::String(v) => self.builder.append_value(v),
423 _ => panic!("Unexpected value {value:?}"),
424 }
425 }
426 fn len(&self) -> usize {
427 self.builder.len()
428 }
429 fn finish(&mut self) -> Arc<dyn Array> {
430 Arc::new(self.builder.finish())
431 }
432}