datafusion_ethers/
udf.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use alloy::dyn_abi::{DecodedEvent, DynSolValue, EventExt as _};
5use alloy::hex::ToHexExt;
6use alloy::json_abi::Event;
7use alloy::primitives::B256;
8use datafusion::arrow::array::{self, Array as _};
9use datafusion::error::DataFusionError;
10use datafusion::execution::FunctionRegistry;
11use datafusion::logical_expr::ScalarFunctionArgs;
12use datafusion::{
13    arrow::datatypes::DataType,
14    common::plan_err,
15    logical_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, Volatility},
16    scalar::ScalarValue,
17};
18
19///////////////////////////////////////////////////////////////////////////////////////////////////
20
21pub fn event_to_json(schema: &Event, event: &DecodedEvent) -> serde_json::Value {
22    let mut obj = serde_json::Map::new();
23    obj.insert("name".to_string(), schema.name.clone().into());
24
25    let mut i_indexed = 0;
26    let mut i_body = 0;
27
28    for input in &schema.inputs {
29        let value = if input.indexed {
30            let v = &event.indexed[i_indexed];
31            i_indexed += 1;
32            v
33        } else {
34            let v = &event.body[i_body];
35            i_body += 1;
36            v
37        };
38
39        obj.insert(input.name.clone(), solidity_value_to_json(value));
40    }
41
42    obj.into()
43}
44
45pub fn solidity_value_to_json(value: &DynSolValue) -> serde_json::Value {
46    use serde_json::Value;
47    match value {
48        DynSolValue::Bool(v) => Value::Bool(*v),
49        DynSolValue::Int(v, bits) if *bits <= 64 => Value::Number(v.as_i64().into()),
50        DynSolValue::Int(v, _) => Value::String(v.to_string()),
51        DynSolValue::Uint(v, bits) if *bits <= 64 => Value::Number(v.to::<u64>().into()),
52        DynSolValue::Uint(v, _) => Value::String(v.to_string()),
53        DynSolValue::FixedBytes(v, _) => Value::String(v.encode_hex()),
54        DynSolValue::Address(v) => Value::String(v.encode_hex()),
55        DynSolValue::Function(_) => todo!(),
56        DynSolValue::Bytes(v) => Value::String(v.encode_hex()),
57        DynSolValue::String(v) => Value::String(v.clone()),
58        DynSolValue::Array(_) => todo!(),
59        DynSolValue::FixedArray(_) => todo!(),
60        DynSolValue::Tuple(_) => todo!(),
61    }
62}
63
64///////////////////////////////////////////////////////////////////////////////////////////////////
65
66#[derive(Debug)]
67struct UdfEthDecodeEvent {
68    signature: Signature,
69}
70
71impl UdfEthDecodeEvent {
72    fn new() -> Self {
73        Self {
74            signature: Signature::variadic_any(Volatility::Immutable),
75        }
76    }
77}
78
79impl ScalarUDFImpl for UdfEthDecodeEvent {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    fn name(&self) -> &str {
85        "eth_decode_event"
86    }
87
88    fn signature(&self) -> &Signature {
89        &self.signature
90    }
91
92    #[allow(clippy::get_first)]
93    fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
94        if args.len() != 6 {
95            return plan_err!("eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data");
96        }
97        if !matches!(args.get(0), Some(&DataType::Utf8)) {
98            return plan_err!("event_signature must be a Utf8 scalar");
99        }
100        if !matches!(args.get(1), Some(&DataType::Binary)) {
101            return plan_err!("topic0 must be a Binary");
102        }
103        if !matches!(args.get(2), Some(&DataType::Binary)) {
104            return plan_err!("topic1 must be a Binary");
105        }
106        if !matches!(args.get(3), Some(&DataType::Binary)) {
107            return plan_err!("topic2 must be a Binary");
108        }
109        if !matches!(args.get(4), Some(&DataType::Binary)) {
110            return plan_err!("topic3 must be a Binary");
111        }
112        if !matches!(args.get(5), Some(&DataType::Binary)) {
113            return plan_err!("data must be a Binary");
114        }
115        Ok(DataType::Utf8)
116    }
117
118    fn invoke_with_args(
119        &self,
120        args: ScalarFunctionArgs,
121    ) -> datafusion::error::Result<ColumnarValue> {
122        let signature = match &args.args[0] {
123            ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
124            _ => return plan_err!("Event signature must be a Utf8 scalar"),
125        };
126
127        let event = Event::parse(signature)
128            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
129
130        let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
131        let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
132        let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
133        let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
134        let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
135        let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
136
137        let mut builder = array::StringBuilder::new();
138
139        for i in 0..c_topic0.len() {
140            let mut topics = Vec::with_capacity(4);
141
142            for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
143                if c_topic.is_null(i) {
144                    break;
145                }
146                topics.push(B256::try_from(c_topic.value(i)).expect("??"));
147            }
148            let data = c_data.value(i);
149
150            let decoded = event
151                .decode_log_parts(topics, data)
152                .map_err(|e| DataFusionError::External(e.into()))?;
153
154            builder.append_value(event_to_json(&event, &decoded).to_string());
155        }
156
157        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
158    }
159}
160
161///////////////////////////////////////////////////////////////////////////////////////////////////
162
163#[derive(Debug)]
164struct UdfEthTryDecodeEvent {
165    signature: Signature,
166}
167
168impl UdfEthTryDecodeEvent {
169    fn new() -> Self {
170        Self {
171            signature: Signature::variadic_any(Volatility::Immutable),
172        }
173    }
174}
175
176impl ScalarUDFImpl for UdfEthTryDecodeEvent {
177    fn as_any(&self) -> &dyn Any {
178        self
179    }
180
181    fn name(&self) -> &str {
182        "eth_try_decode_event"
183    }
184
185    fn signature(&self) -> &Signature {
186        &self.signature
187    }
188
189    #[allow(clippy::get_first)]
190    fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
191        if args.len() != 6 {
192            return plan_err!("eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data");
193        }
194        if !matches!(args.get(0), Some(&DataType::Utf8)) {
195            return plan_err!("event_signature must be a Utf8 scalar");
196        }
197        if !matches!(args.get(1), Some(&DataType::Binary)) {
198            return plan_err!("topic0 must be a Binary");
199        }
200        if !matches!(args.get(2), Some(&DataType::Binary)) {
201            return plan_err!("topic1 must be a Binary");
202        }
203        if !matches!(args.get(3), Some(&DataType::Binary)) {
204            return plan_err!("topic2 must be a Binary");
205        }
206        if !matches!(args.get(4), Some(&DataType::Binary)) {
207            return plan_err!("topic3 must be a Binary");
208        }
209        if !matches!(args.get(5), Some(&DataType::Binary)) {
210            return plan_err!("data must be a Binary");
211        }
212        Ok(DataType::Utf8)
213    }
214
215    fn invoke_with_args(
216        &self,
217        args: ScalarFunctionArgs,
218    ) -> datafusion::error::Result<ColumnarValue> {
219        let signature = match &args.args[0] {
220            ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
221            _ => return plan_err!("Event signature must be a Utf8 scalar"),
222        };
223
224        let event = Event::parse(signature)
225            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
226
227        let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
228        let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
229        let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
230        let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
231        let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
232        let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
233
234        let mut builder = array::StringBuilder::new();
235
236        for i in 0..c_topic0.len() {
237            let mut topics = Vec::with_capacity(4);
238
239            for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
240                if c_topic.is_null(i) {
241                    break;
242                }
243                topics.push(B256::try_from(c_topic.value(i)).expect("??"));
244            }
245            let data = c_data.value(i);
246
247            match event.decode_log_parts(topics, data) {
248                Ok(decoded) => builder.append_value(event_to_json(&event, &decoded).to_string()),
249                Err(_) => builder.append_null(),
250            }
251        }
252
253        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
254    }
255}
256
257///////////////////////////////////////////////////////////////////////////////////////////////////
258
259#[derive(Debug)]
260struct UdfEthEventSelector {
261    signature: Signature,
262}
263
264impl UdfEthEventSelector {
265    fn new() -> Self {
266        Self {
267            signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable),
268        }
269    }
270}
271
272impl ScalarUDFImpl for UdfEthEventSelector {
273    fn as_any(&self) -> &dyn Any {
274        self
275    }
276
277    fn name(&self) -> &str {
278        "eth_event_selector"
279    }
280
281    fn signature(&self) -> &Signature {
282        &self.signature
283    }
284
285    fn return_type(&self, _args: &[DataType]) -> datafusion::error::Result<DataType> {
286        Ok(DataType::Binary)
287    }
288
289    fn invoke_with_args(
290        &self,
291        args: ScalarFunctionArgs,
292    ) -> datafusion::error::Result<ColumnarValue> {
293        let signature = match &args.args[0] {
294            ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
295            _ => return plan_err!("Event signature must be a Utf8 scalar"),
296        };
297
298        let event = Event::parse(signature)
299            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
300
301        let mut builder = array::BinaryBuilder::new();
302        builder.append_value(event.selector().as_slice());
303        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
304    }
305}
306
307///////////////////////////////////////////////////////////////////////////////////////////////////
308
309pub fn register_all(registry: &mut dyn FunctionRegistry) -> datafusion::error::Result<()> {
310    registry.register_udf(Arc::new(ScalarUDF::from(UdfEthDecodeEvent::new())))?;
311    registry.register_udf(Arc::new(ScalarUDF::from(UdfEthTryDecodeEvent::new())))?;
312    registry.register_udf(Arc::new(ScalarUDF::from(UdfEthEventSelector::new())))?;
313    Ok(())
314}
315
316///////////////////////////////////////////////////////////////////////////////////////////////////