datafusion_ethers/convert/
decoded.rs1use alloy::dyn_abi::{DecodedEvent, DynSolEvent, DynSolType, DynSolValue, Specifier};
2use alloy::json_abi::{Event, EventParam};
3use alloy::primitives::Sign;
4use alloy::rpc::types::eth::Log;
5use datafusion::arrow::array::{self, Array, ArrayBuilder, RecordBatch};
6use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7use std::sync::Arc;
8
9use super::{AppendError, Transcoder};
10
11pub struct EthDecodedLogsToArrow {
15 schema: SchemaRef,
16 event_decoder: DynSolEvent,
17 field_builders: Vec<Box<dyn SolidityArrayBuilder + Send>>,
19}
20
21impl EthDecodedLogsToArrow {
22 pub fn new(event_type: &Event) -> Self {
23 let resolved_type = event_type.resolve().unwrap();
24
25 let mut fields = Vec::new();
26 let mut field_builders = Vec::new();
27
28 for (typ, param) in resolved_type
29 .indexed()
30 .iter()
31 .chain(resolved_type.body().iter())
32 .zip(
33 event_type
34 .inputs
35 .iter()
36 .filter(|f| f.indexed)
37 .chain(event_type.inputs.iter().filter(|f| !f.indexed)),
38 )
39 {
40 let (field, builder) = Self::event_param_to_field(param, typ);
41 fields.push(field);
42 field_builders.push(builder);
43 }
44
45 Self {
46 schema: Arc::new(Schema::new(fields)),
47 event_decoder: resolved_type,
48 field_builders,
49 }
50 }
51
52 pub fn new_from_signature(signature: &str) -> Result<Self, alloy::dyn_abi::parser::Error> {
53 let event_type = alloy::json_abi::Event::parse(signature)?;
54 Ok(Self::new(&event_type))
55 }
56
57 pub fn push_decoded(&mut self, log: &DecodedEvent) {
58 for (val, builder) in log
59 .indexed
60 .iter()
61 .chain(log.body.iter())
62 .zip(self.field_builders.iter_mut())
63 {
64 builder.append_value(val);
65 }
66 }
67
68 fn event_param_to_field(
69 param: &EventParam,
70 typ: &DynSolType,
71 ) -> (Field, Box<dyn SolidityArrayBuilder + Send>) {
72 match typ {
73 DynSolType::Bool => (
74 Field::new(¶m.name, DataType::Boolean, false),
75 Box::<SolidityArrayBuilderBool>::default(),
76 ),
77 DynSolType::Int(64) => (
78 Field::new(¶m.name, DataType::Int64, false),
79 Box::<SolidityArrayBuilderInt64>::default(),
80 ),
81 DynSolType::Int(128) => (
82 Field::new(¶m.name, DataType::Utf8, false),
83 Box::<SolidityArrayBuilderInt128>::default(),
84 ),
85 DynSolType::Int(256) => (
86 Field::new(¶m.name, DataType::Utf8, false),
87 Box::<SolidityArrayBuilderInt256>::default(),
88 ),
89 DynSolType::Uint(64) => (
90 Field::new(¶m.name, DataType::UInt64, false),
91 Box::<SolidityArrayBuilderUInt64>::default(),
92 ),
93 DynSolType::Uint(128) => (
94 Field::new(¶m.name, DataType::Utf8, false),
95 Box::<SolidityArrayBuilderUInt128>::default(),
96 ),
97 DynSolType::Uint(256) => (
98 Field::new(¶m.name, DataType::Utf8, false),
99 Box::<SolidityArrayBuilderUInt256>::default(),
100 ),
101 DynSolType::Address => (
102 Field::new(
103 ¶m.name,
104 DataType::Binary,
107 false,
108 ),
109 Box::<SolidityArrayBuilderAddress>::default(),
110 ),
111 DynSolType::Bytes => (
112 Field::new(¶m.name, DataType::Binary, false),
113 Box::<SolidityArrayBuilderBytes>::default(),
114 ),
115 _ => unimplemented!(
116 "Support for transcoding {typ} solidity type to arrow is not yet implemented",
117 ),
118 }
119 }
120}
121
122impl Transcoder for EthDecodedLogsToArrow {
123 fn schema(&self) -> SchemaRef {
124 self.schema.clone()
125 }
126
127 fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
128 for log in logs {
129 let decoded = self.event_decoder.decode_log_data(log.data())?;
130 self.push_decoded(&decoded);
131 }
132 Ok(())
133 }
134
135 fn len(&self) -> usize {
136 self.field_builders[0].len()
137 }
138
139 fn finish(&mut self) -> RecordBatch {
140 let columns = self.field_builders.iter_mut().map(|b| b.finish()).collect();
141 RecordBatch::try_new(self.schema.clone(), columns).unwrap()
142 }
143}
144
145trait SolidityArrayBuilder {
146 fn append_value(&mut self, value: &DynSolValue);
147 fn len(&self) -> usize;
148 fn finish(&mut self) -> Arc<dyn Array>;
149}
150
151#[derive(Default)]
156struct SolidityArrayBuilderBool {
157 builder: array::BooleanBuilder,
158}
159
160impl SolidityArrayBuilder for SolidityArrayBuilderBool {
161 fn append_value(&mut self, value: &DynSolValue) {
162 match value {
163 DynSolValue::Bool(v) => self.builder.append_value(*v),
164 _ => panic!("Unexpected value {value:?}"),
165 }
166 }
167 fn len(&self) -> usize {
168 self.builder.len()
169 }
170 fn finish(&mut self) -> Arc<dyn Array> {
171 Arc::new(self.builder.finish())
172 }
173}
174
175#[derive(Default)]
178struct SolidityArrayBuilderInt64 {
179 builder: array::Int64Builder,
180}
181
182impl SolidityArrayBuilder for SolidityArrayBuilderInt64 {
183 fn append_value(&mut self, value: &DynSolValue) {
184 match value {
185 DynSolValue::Int(v, 64) => {
186 let (sign, abs) = v.into_sign_and_abs();
187 let v = match sign {
188 Sign::Positive => abs.as_limbs()[0] as i64,
189 Sign::Negative => -(abs.as_limbs()[0] as i64),
190 };
191 self.builder.append_value(v);
192 }
193 _ => panic!("Unexpected value {value:?}"),
194 }
195 }
196 fn len(&self) -> usize {
197 self.builder.len()
198 }
199 fn finish(&mut self) -> Arc<dyn Array> {
200 Arc::new(self.builder.finish())
201 }
202}
203
204#[derive(Default)]
205struct SolidityArrayBuilderUInt64 {
206 builder: array::UInt64Builder,
207}
208
209impl SolidityArrayBuilder for SolidityArrayBuilderUInt64 {
210 fn append_value(&mut self, value: &DynSolValue) {
211 match value {
212 DynSolValue::Uint(v, 64) => self.builder.append_value(v.as_limbs()[0]),
213 _ => panic!("Unexpected value {value:?}"),
214 }
215 }
216 fn len(&self) -> usize {
217 self.builder.len()
218 }
219 fn finish(&mut self) -> Arc<dyn Array> {
220 Arc::new(self.builder.finish())
221 }
222}
223
224#[derive(Default)]
227struct SolidityArrayBuilderInt128 {
228 builder: array::StringBuilder,
229}
230
231impl SolidityArrayBuilder for SolidityArrayBuilderInt128 {
232 fn append_value(&mut self, value: &DynSolValue) {
233 match value {
234 DynSolValue::Int(v, 128) => self.builder.append_value(v.to_string()),
235 _ => panic!("Unexpected value {value:?}"),
236 }
237 }
238 fn len(&self) -> usize {
239 self.builder.len()
240 }
241 fn finish(&mut self) -> Arc<dyn Array> {
242 Arc::new(self.builder.finish())
243 }
244}
245
246#[derive(Default)]
247struct SolidityArrayBuilderUInt128 {
248 builder: array::StringBuilder,
249}
250
251impl SolidityArrayBuilder for SolidityArrayBuilderUInt128 {
252 fn append_value(&mut self, value: &DynSolValue) {
253 match value {
254 DynSolValue::Uint(v, 128) => self.builder.append_value(v.to_string()),
255 _ => panic!("Unexpected value {value:?}"),
256 }
257 }
258 fn len(&self) -> usize {
259 self.builder.len()
260 }
261 fn finish(&mut self) -> Arc<dyn Array> {
262 Arc::new(self.builder.finish())
263 }
264}
265
266#[derive(Default)]
269struct SolidityArrayBuilderInt256 {
270 builder: array::StringBuilder,
271}
272
273impl SolidityArrayBuilder for SolidityArrayBuilderInt256 {
274 fn append_value(&mut self, value: &DynSolValue) {
275 match value {
276 DynSolValue::Int(v, 256) => self.builder.append_value(v.to_string()),
277 _ => panic!("Unexpected value {value:?}"),
278 }
279 }
280 fn len(&self) -> usize {
281 self.builder.len()
282 }
283 fn finish(&mut self) -> Arc<dyn Array> {
284 Arc::new(self.builder.finish())
285 }
286}
287
288#[derive(Default)]
289struct SolidityArrayBuilderUInt256 {
290 builder: array::StringBuilder,
291}
292
293impl SolidityArrayBuilder for SolidityArrayBuilderUInt256 {
294 fn append_value(&mut self, value: &DynSolValue) {
295 match value {
296 DynSolValue::Uint(v, 256) => self.builder.append_value(v.to_string()),
297 _ => panic!("Unexpected value {value:?}"),
298 }
299 }
300 fn len(&self) -> usize {
301 self.builder.len()
302 }
303 fn finish(&mut self) -> Arc<dyn Array> {
304 Arc::new(self.builder.finish())
305 }
306}
307
308#[derive(Default)]
311struct SolidityArrayBuilderAddress {
312 builder: array::BinaryBuilder,
313}
314
315impl SolidityArrayBuilder for SolidityArrayBuilderAddress {
316 fn append_value(&mut self, value: &DynSolValue) {
317 match value {
318 DynSolValue::Address(v) => self.builder.append_value(v.as_slice()),
319 _ => panic!("Unexpected value {value:?}"),
320 }
321 }
322 fn len(&self) -> usize {
323 self.builder.len()
324 }
325 fn finish(&mut self) -> Arc<dyn Array> {
326 Arc::new(self.builder.finish())
327 }
328}
329
330#[derive(Default)]
333struct SolidityArrayBuilderBytes {
334 builder: array::BinaryBuilder,
335}
336
337impl SolidityArrayBuilder for SolidityArrayBuilderBytes {
338 fn append_value(&mut self, value: &DynSolValue) {
339 match value {
340 DynSolValue::Bytes(v) => self.builder.append_value(v),
341 _ => panic!("Unexpected value {value:?}"),
342 }
343 }
344 fn len(&self) -> usize {
345 self.builder.len()
346 }
347 fn finish(&mut self) -> Arc<dyn Array> {
348 Arc::new(self.builder.finish())
349 }
350}