datafusion_ethers/
udf.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use alloy::dyn_abi::{DecodedEvent, DynSolValue, EventExt as _};
5use alloy::hex::ToHexExt;
6use alloy::json_abi::Event;
7use alloy::primitives::B256;
8use datafusion::arrow::array::{self, Array as _};
9use datafusion::error::DataFusionError;
10use datafusion::execution::FunctionRegistry;
11use datafusion::logical_expr::ScalarFunctionArgs;
12use datafusion::{
13    arrow::datatypes::DataType,
14    common::plan_err,
15    logical_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, Volatility},
16    scalar::ScalarValue,
17};
18
19///////////////////////////////////////////////////////////////////////////////////////////////////
20
21pub fn event_to_json(schema: &Event, event: &DecodedEvent) -> serde_json::Value {
22    let mut obj = serde_json::Map::new();
23    obj.insert("name".to_string(), schema.name.clone().into());
24
25    let mut i_indexed = 0;
26    let mut i_body = 0;
27
28    for input in &schema.inputs {
29        let value = if input.indexed {
30            let v = &event.indexed[i_indexed];
31            i_indexed += 1;
32            v
33        } else {
34            let v = &event.body[i_body];
35            i_body += 1;
36            v
37        };
38
39        obj.insert(input.name.clone(), solidity_value_to_json(value));
40    }
41
42    obj.into()
43}
44
45pub fn solidity_value_to_json(value: &DynSolValue) -> serde_json::Value {
46    use serde_json::Value;
47    match value {
48        DynSolValue::Bool(v) => Value::Bool(*v),
49        DynSolValue::Int(v, bits) if *bits <= 64 => Value::Number(v.as_i64().into()),
50        DynSolValue::Int(v, _) => Value::String(v.to_string()),
51        DynSolValue::Uint(v, bits) if *bits <= 64 => Value::Number(v.to::<u64>().into()),
52        DynSolValue::Uint(v, _) => Value::String(v.to_string()),
53        DynSolValue::FixedBytes(v, _) => Value::String(v.encode_hex()),
54        DynSolValue::Address(v) => Value::String(v.encode_hex()),
55        DynSolValue::Function(_) => todo!(),
56        DynSolValue::Bytes(v) => Value::String(v.encode_hex()),
57        DynSolValue::String(v) => Value::String(v.clone()),
58        DynSolValue::Array(_) => todo!(),
59        DynSolValue::FixedArray(_) => todo!(),
60        DynSolValue::Tuple(_) => todo!(),
61    }
62}
63
64///////////////////////////////////////////////////////////////////////////////////////////////////
65
66#[derive(Debug, PartialEq, Eq, Hash)]
67struct UdfEthDecodeEvent {
68    signature: Signature,
69}
70
71impl UdfEthDecodeEvent {
72    fn new() -> Self {
73        Self {
74            signature: Signature::variadic_any(Volatility::Immutable),
75        }
76    }
77}
78
79impl ScalarUDFImpl for UdfEthDecodeEvent {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    fn name(&self) -> &str {
85        "eth_decode_event"
86    }
87
88    fn signature(&self) -> &Signature {
89        &self.signature
90    }
91
92    #[allow(clippy::get_first)]
93    fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
94        if args.len() != 6 {
95            return plan_err!(
96                "eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data"
97            );
98        }
99        if !matches!(args.get(0), Some(&DataType::Utf8)) {
100            return plan_err!("event_signature must be a Utf8 scalar");
101        }
102        if !matches!(args.get(1), Some(&DataType::Binary)) {
103            return plan_err!("topic0 must be a Binary");
104        }
105        if !matches!(args.get(2), Some(&DataType::Binary)) {
106            return plan_err!("topic1 must be a Binary");
107        }
108        if !matches!(args.get(3), Some(&DataType::Binary)) {
109            return plan_err!("topic2 must be a Binary");
110        }
111        if !matches!(args.get(4), Some(&DataType::Binary)) {
112            return plan_err!("topic3 must be a Binary");
113        }
114        if !matches!(args.get(5), Some(&DataType::Binary)) {
115            return plan_err!("data must be a Binary");
116        }
117        Ok(DataType::Utf8)
118    }
119
120    fn invoke_with_args(
121        &self,
122        args: ScalarFunctionArgs,
123    ) -> datafusion::error::Result<ColumnarValue> {
124        let signature = match &args.args[0] {
125            ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
126            _ => return plan_err!("Event signature must be a Utf8 scalar"),
127        };
128
129        let event = Event::parse(signature)
130            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
131
132        let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
133        let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
134        let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
135        let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
136        let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
137        let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
138
139        let mut builder = array::StringBuilder::new();
140
141        for i in 0..c_topic0.len() {
142            let mut topics = Vec::with_capacity(4);
143
144            for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
145                if c_topic.is_null(i) {
146                    break;
147                }
148                topics.push(B256::try_from(c_topic.value(i)).expect("??"));
149            }
150            let data = c_data.value(i);
151
152            let decoded = event
153                .decode_log_parts(topics, data)
154                .map_err(|e| DataFusionError::External(e.into()))?;
155
156            builder.append_value(event_to_json(&event, &decoded).to_string());
157        }
158
159        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
160    }
161}
162
163///////////////////////////////////////////////////////////////////////////////////////////////////
164
165#[derive(Debug, PartialEq, Eq, Hash)]
166struct UdfEthTryDecodeEvent {
167    signature: Signature,
168}
169
170impl UdfEthTryDecodeEvent {
171    fn new() -> Self {
172        Self {
173            signature: Signature::variadic_any(Volatility::Immutable),
174        }
175    }
176}
177
178impl ScalarUDFImpl for UdfEthTryDecodeEvent {
179    fn as_any(&self) -> &dyn Any {
180        self
181    }
182
183    fn name(&self) -> &str {
184        "eth_try_decode_event"
185    }
186
187    fn signature(&self) -> &Signature {
188        &self.signature
189    }
190
191    #[allow(clippy::get_first)]
192    fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
193        if args.len() != 6 {
194            return plan_err!(
195                "eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data"
196            );
197        }
198        if !matches!(args.get(0), Some(&DataType::Utf8)) {
199            return plan_err!("event_signature must be a Utf8 scalar");
200        }
201        if !matches!(args.get(1), Some(&DataType::Binary)) {
202            return plan_err!("topic0 must be a Binary");
203        }
204        if !matches!(args.get(2), Some(&DataType::Binary)) {
205            return plan_err!("topic1 must be a Binary");
206        }
207        if !matches!(args.get(3), Some(&DataType::Binary)) {
208            return plan_err!("topic2 must be a Binary");
209        }
210        if !matches!(args.get(4), Some(&DataType::Binary)) {
211            return plan_err!("topic3 must be a Binary");
212        }
213        if !matches!(args.get(5), Some(&DataType::Binary)) {
214            return plan_err!("data must be a Binary");
215        }
216        Ok(DataType::Utf8)
217    }
218
219    fn invoke_with_args(
220        &self,
221        args: ScalarFunctionArgs,
222    ) -> datafusion::error::Result<ColumnarValue> {
223        let signature = match &args.args[0] {
224            ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
225            _ => return plan_err!("Event signature must be a Utf8 scalar"),
226        };
227
228        let event = Event::parse(signature)
229            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
230
231        let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
232        let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
233        let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
234        let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
235        let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
236        let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
237
238        let mut builder = array::StringBuilder::new();
239
240        for i in 0..c_topic0.len() {
241            let mut topics = Vec::with_capacity(4);
242
243            for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
244                if c_topic.is_null(i) {
245                    break;
246                }
247                topics.push(B256::try_from(c_topic.value(i)).expect("??"));
248            }
249            let data = c_data.value(i);
250
251            match event.decode_log_parts(topics, data) {
252                Ok(decoded) => builder.append_value(event_to_json(&event, &decoded).to_string()),
253                Err(_) => builder.append_null(),
254            }
255        }
256
257        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
258    }
259}
260
261///////////////////////////////////////////////////////////////////////////////////////////////////
262
263#[derive(Debug, PartialEq, Eq, Hash)]
264struct UdfEthEventSelector {
265    signature: Signature,
266}
267
268impl UdfEthEventSelector {
269    fn new() -> Self {
270        Self {
271            signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable),
272        }
273    }
274}
275
276impl ScalarUDFImpl for UdfEthEventSelector {
277    fn as_any(&self) -> &dyn Any {
278        self
279    }
280
281    fn name(&self) -> &str {
282        "eth_event_selector"
283    }
284
285    fn signature(&self) -> &Signature {
286        &self.signature
287    }
288
289    fn return_type(&self, _args: &[DataType]) -> datafusion::error::Result<DataType> {
290        Ok(DataType::Binary)
291    }
292
293    fn invoke_with_args(
294        &self,
295        args: ScalarFunctionArgs,
296    ) -> datafusion::error::Result<ColumnarValue> {
297        let signature = match &args.args[0] {
298            ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
299            _ => return plan_err!("Event signature must be a Utf8 scalar"),
300        };
301
302        let event = Event::parse(signature)
303            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
304
305        let mut builder = array::BinaryBuilder::new();
306        builder.append_value(event.selector().as_slice());
307        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
308    }
309}
310
311///////////////////////////////////////////////////////////////////////////////////////////////////
312
313pub fn register_all(registry: &mut dyn FunctionRegistry) -> datafusion::error::Result<()> {
314    registry.register_udf(Arc::new(ScalarUDF::from(UdfEthDecodeEvent::new())))?;
315    registry.register_udf(Arc::new(ScalarUDF::from(UdfEthTryDecodeEvent::new())))?;
316    registry.register_udf(Arc::new(ScalarUDF::from(UdfEthEventSelector::new())))?;
317    Ok(())
318}
319
320///////////////////////////////////////////////////////////////////////////////////////////////////