1use std::any::Any;
2use std::sync::Arc;
3
4use alloy::dyn_abi::{DecodedEvent, DynSolValue, EventExt as _};
5use alloy::hex::ToHexExt;
6use alloy::json_abi::Event;
7use alloy::primitives::B256;
8use datafusion::arrow::array::{self, Array as _};
9use datafusion::error::DataFusionError;
10use datafusion::execution::FunctionRegistry;
11use datafusion::logical_expr::ScalarFunctionArgs;
12use datafusion::{
13 arrow::datatypes::DataType,
14 common::plan_err,
15 logical_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, Volatility},
16 scalar::ScalarValue,
17};
18
19pub fn event_to_json(schema: &Event, event: &DecodedEvent) -> serde_json::Value {
22 let mut obj = serde_json::Map::new();
23 obj.insert("name".to_string(), schema.name.clone().into());
24
25 let mut i_indexed = 0;
26 let mut i_body = 0;
27
28 for input in &schema.inputs {
29 let value = if input.indexed {
30 let v = &event.indexed[i_indexed];
31 i_indexed += 1;
32 v
33 } else {
34 let v = &event.body[i_body];
35 i_body += 1;
36 v
37 };
38
39 obj.insert(input.name.clone(), solidity_value_to_json(value));
40 }
41
42 obj.into()
43}
44
45pub fn solidity_value_to_json(value: &DynSolValue) -> serde_json::Value {
46 use serde_json::Value;
47 match value {
48 DynSolValue::Bool(v) => Value::Bool(*v),
49 DynSolValue::Int(v, bits) if *bits <= 64 => Value::Number(v.as_i64().into()),
50 DynSolValue::Int(v, _) => Value::String(v.to_string()),
51 DynSolValue::Uint(v, bits) if *bits <= 64 => Value::Number(v.to::<u64>().into()),
52 DynSolValue::Uint(v, _) => Value::String(v.to_string()),
53 DynSolValue::FixedBytes(v, _) => Value::String(v.encode_hex()),
54 DynSolValue::Address(v) => Value::String(v.encode_hex()),
55 DynSolValue::Function(_) => todo!(),
56 DynSolValue::Bytes(v) => Value::String(v.encode_hex()),
57 DynSolValue::String(v) => Value::String(v.clone()),
58 DynSolValue::Array(_) => todo!(),
59 DynSolValue::FixedArray(_) => todo!(),
60 DynSolValue::Tuple(_) => todo!(),
61 }
62}
63
64#[derive(Debug)]
67struct UdfEthDecodeEvent {
68 signature: Signature,
69}
70
71impl UdfEthDecodeEvent {
72 fn new() -> Self {
73 Self {
74 signature: Signature::variadic_any(Volatility::Immutable),
75 }
76 }
77}
78
79impl ScalarUDFImpl for UdfEthDecodeEvent {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 fn name(&self) -> &str {
85 "eth_decode_event"
86 }
87
88 fn signature(&self) -> &Signature {
89 &self.signature
90 }
91
92 #[allow(clippy::get_first)]
93 fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
94 if args.len() != 6 {
95 return plan_err!("eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data");
96 }
97 if !matches!(args.get(0), Some(&DataType::Utf8)) {
98 return plan_err!("event_signature must be a Utf8 scalar");
99 }
100 if !matches!(args.get(1), Some(&DataType::Binary)) {
101 return plan_err!("topic0 must be a Binary");
102 }
103 if !matches!(args.get(2), Some(&DataType::Binary)) {
104 return plan_err!("topic1 must be a Binary");
105 }
106 if !matches!(args.get(3), Some(&DataType::Binary)) {
107 return plan_err!("topic2 must be a Binary");
108 }
109 if !matches!(args.get(4), Some(&DataType::Binary)) {
110 return plan_err!("topic3 must be a Binary");
111 }
112 if !matches!(args.get(5), Some(&DataType::Binary)) {
113 return plan_err!("data must be a Binary");
114 }
115 Ok(DataType::Utf8)
116 }
117
118 fn invoke_with_args(
119 &self,
120 args: ScalarFunctionArgs,
121 ) -> datafusion::error::Result<ColumnarValue> {
122 let signature = match &args.args[0] {
123 ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
124 _ => return plan_err!("Event signature must be a Utf8 scalar"),
125 };
126
127 let event = Event::parse(signature)
128 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
129
130 let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
131 let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
132 let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
133 let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
134 let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
135 let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
136
137 let mut builder = array::StringBuilder::new();
138
139 for i in 0..c_topic0.len() {
140 let mut topics = Vec::with_capacity(4);
141
142 for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
143 if c_topic.is_null(i) {
144 break;
145 }
146 topics.push(B256::try_from(c_topic.value(i)).expect("??"));
147 }
148 let data = c_data.value(i);
149
150 let decoded = event
151 .decode_log_parts(topics, data)
152 .map_err(|e| DataFusionError::External(e.into()))?;
153
154 builder.append_value(event_to_json(&event, &decoded).to_string());
155 }
156
157 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
158 }
159}
160
161#[derive(Debug)]
164struct UdfEthTryDecodeEvent {
165 signature: Signature,
166}
167
168impl UdfEthTryDecodeEvent {
169 fn new() -> Self {
170 Self {
171 signature: Signature::variadic_any(Volatility::Immutable),
172 }
173 }
174}
175
176impl ScalarUDFImpl for UdfEthTryDecodeEvent {
177 fn as_any(&self) -> &dyn Any {
178 self
179 }
180
181 fn name(&self) -> &str {
182 "eth_try_decode_event"
183 }
184
185 fn signature(&self) -> &Signature {
186 &self.signature
187 }
188
189 #[allow(clippy::get_first)]
190 fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
191 if args.len() != 6 {
192 return plan_err!("eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data");
193 }
194 if !matches!(args.get(0), Some(&DataType::Utf8)) {
195 return plan_err!("event_signature must be a Utf8 scalar");
196 }
197 if !matches!(args.get(1), Some(&DataType::Binary)) {
198 return plan_err!("topic0 must be a Binary");
199 }
200 if !matches!(args.get(2), Some(&DataType::Binary)) {
201 return plan_err!("topic1 must be a Binary");
202 }
203 if !matches!(args.get(3), Some(&DataType::Binary)) {
204 return plan_err!("topic2 must be a Binary");
205 }
206 if !matches!(args.get(4), Some(&DataType::Binary)) {
207 return plan_err!("topic3 must be a Binary");
208 }
209 if !matches!(args.get(5), Some(&DataType::Binary)) {
210 return plan_err!("data must be a Binary");
211 }
212 Ok(DataType::Utf8)
213 }
214
215 fn invoke_with_args(
216 &self,
217 args: ScalarFunctionArgs,
218 ) -> datafusion::error::Result<ColumnarValue> {
219 let signature = match &args.args[0] {
220 ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
221 _ => return plan_err!("Event signature must be a Utf8 scalar"),
222 };
223
224 let event = Event::parse(signature)
225 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
226
227 let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
228 let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
229 let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
230 let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
231 let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
232 let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
233
234 let mut builder = array::StringBuilder::new();
235
236 for i in 0..c_topic0.len() {
237 let mut topics = Vec::with_capacity(4);
238
239 for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
240 if c_topic.is_null(i) {
241 break;
242 }
243 topics.push(B256::try_from(c_topic.value(i)).expect("??"));
244 }
245 let data = c_data.value(i);
246
247 match event.decode_log_parts(topics, data) {
248 Ok(decoded) => builder.append_value(event_to_json(&event, &decoded).to_string()),
249 Err(_) => builder.append_null(),
250 }
251 }
252
253 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
254 }
255}
256
257#[derive(Debug)]
260struct UdfEthEventSelector {
261 signature: Signature,
262}
263
264impl UdfEthEventSelector {
265 fn new() -> Self {
266 Self {
267 signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable),
268 }
269 }
270}
271
272impl ScalarUDFImpl for UdfEthEventSelector {
273 fn as_any(&self) -> &dyn Any {
274 self
275 }
276
277 fn name(&self) -> &str {
278 "eth_event_selector"
279 }
280
281 fn signature(&self) -> &Signature {
282 &self.signature
283 }
284
285 fn return_type(&self, _args: &[DataType]) -> datafusion::error::Result<DataType> {
286 Ok(DataType::Binary)
287 }
288
289 fn invoke_with_args(
290 &self,
291 args: ScalarFunctionArgs,
292 ) -> datafusion::error::Result<ColumnarValue> {
293 let signature = match &args.args[0] {
294 ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
295 _ => return plan_err!("Event signature must be a Utf8 scalar"),
296 };
297
298 let event = Event::parse(signature)
299 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
300
301 let mut builder = array::BinaryBuilder::new();
302 builder.append_value(event.selector().as_slice());
303 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
304 }
305}
306
307pub fn register_all(registry: &mut dyn FunctionRegistry) -> datafusion::error::Result<()> {
310 registry.register_udf(Arc::new(ScalarUDF::from(UdfEthDecodeEvent::new())))?;
311 registry.register_udf(Arc::new(ScalarUDF::from(UdfEthTryDecodeEvent::new())))?;
312 registry.register_udf(Arc::new(ScalarUDF::from(UdfEthEventSelector::new())))?;
313 Ok(())
314}
315
316