1use std::any::Any;
2use std::sync::Arc;
3
4use alloy::dyn_abi::{DecodedEvent, DynSolValue, EventExt as _};
5use alloy::hex::ToHexExt;
6use alloy::json_abi::Event;
7use alloy::primitives::B256;
8use datafusion::arrow::array::{self, Array as _};
9use datafusion::error::DataFusionError;
10use datafusion::execution::FunctionRegistry;
11use datafusion::logical_expr::ScalarFunctionArgs;
12use datafusion::{
13 arrow::datatypes::DataType,
14 common::plan_err,
15 logical_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, Volatility},
16 scalar::ScalarValue,
17};
18
19pub fn event_to_json(schema: &Event, event: &DecodedEvent) -> serde_json::Value {
22 let mut obj = serde_json::Map::new();
23 obj.insert("name".to_string(), schema.name.clone().into());
24
25 let mut i_indexed = 0;
26 let mut i_body = 0;
27
28 for input in &schema.inputs {
29 let value = if input.indexed {
30 let v = &event.indexed[i_indexed];
31 i_indexed += 1;
32 v
33 } else {
34 let v = &event.body[i_body];
35 i_body += 1;
36 v
37 };
38
39 obj.insert(input.name.clone(), solidity_value_to_json(value));
40 }
41
42 obj.into()
43}
44
45pub fn solidity_value_to_json(value: &DynSolValue) -> serde_json::Value {
46 use serde_json::Value;
47 match value {
48 DynSolValue::Bool(v) => Value::Bool(*v),
49 DynSolValue::Int(v, bits) if *bits <= 64 => Value::Number(v.as_i64().into()),
50 DynSolValue::Int(v, _) => Value::String(v.to_string()),
51 DynSolValue::Uint(v, bits) if *bits <= 64 => Value::Number(v.to::<u64>().into()),
52 DynSolValue::Uint(v, _) => Value::String(v.to_string()),
53 DynSolValue::FixedBytes(v, _) => Value::String(v.encode_hex()),
54 DynSolValue::Address(v) => Value::String(v.encode_hex()),
55 DynSolValue::Function(_) => todo!(),
56 DynSolValue::Bytes(v) => Value::String(v.encode_hex()),
57 DynSolValue::String(v) => Value::String(v.clone()),
58 DynSolValue::Array(_) => todo!(),
59 DynSolValue::FixedArray(_) => todo!(),
60 DynSolValue::Tuple(_) => todo!(),
61 }
62}
63
64#[derive(Debug, PartialEq, Eq, Hash)]
67struct UdfEthDecodeEvent {
68 signature: Signature,
69}
70
71impl UdfEthDecodeEvent {
72 fn new() -> Self {
73 Self {
74 signature: Signature::variadic_any(Volatility::Immutable),
75 }
76 }
77}
78
79impl ScalarUDFImpl for UdfEthDecodeEvent {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 fn name(&self) -> &str {
85 "eth_decode_event"
86 }
87
88 fn signature(&self) -> &Signature {
89 &self.signature
90 }
91
92 #[allow(clippy::get_first)]
93 fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
94 if args.len() != 6 {
95 return plan_err!(
96 "eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data"
97 );
98 }
99 if !matches!(args.get(0), Some(&DataType::Utf8)) {
100 return plan_err!("event_signature must be a Utf8 scalar");
101 }
102 if !matches!(args.get(1), Some(&DataType::Binary)) {
103 return plan_err!("topic0 must be a Binary");
104 }
105 if !matches!(args.get(2), Some(&DataType::Binary)) {
106 return plan_err!("topic1 must be a Binary");
107 }
108 if !matches!(args.get(3), Some(&DataType::Binary)) {
109 return plan_err!("topic2 must be a Binary");
110 }
111 if !matches!(args.get(4), Some(&DataType::Binary)) {
112 return plan_err!("topic3 must be a Binary");
113 }
114 if !matches!(args.get(5), Some(&DataType::Binary)) {
115 return plan_err!("data must be a Binary");
116 }
117 Ok(DataType::Utf8)
118 }
119
120 fn invoke_with_args(
121 &self,
122 args: ScalarFunctionArgs,
123 ) -> datafusion::error::Result<ColumnarValue> {
124 let signature = match &args.args[0] {
125 ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
126 _ => return plan_err!("Event signature must be a Utf8 scalar"),
127 };
128
129 let event = Event::parse(signature)
130 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
131
132 let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
133 let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
134 let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
135 let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
136 let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
137 let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
138
139 let mut builder = array::StringBuilder::new();
140
141 for i in 0..c_topic0.len() {
142 let mut topics = Vec::with_capacity(4);
143
144 for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
145 if c_topic.is_null(i) {
146 break;
147 }
148 topics.push(B256::try_from(c_topic.value(i)).expect("??"));
149 }
150 let data = c_data.value(i);
151
152 let decoded = event
153 .decode_log_parts(topics, data)
154 .map_err(|e| DataFusionError::External(e.into()))?;
155
156 builder.append_value(event_to_json(&event, &decoded).to_string());
157 }
158
159 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
160 }
161}
162
163#[derive(Debug, PartialEq, Eq, Hash)]
166struct UdfEthTryDecodeEvent {
167 signature: Signature,
168}
169
170impl UdfEthTryDecodeEvent {
171 fn new() -> Self {
172 Self {
173 signature: Signature::variadic_any(Volatility::Immutable),
174 }
175 }
176}
177
178impl ScalarUDFImpl for UdfEthTryDecodeEvent {
179 fn as_any(&self) -> &dyn Any {
180 self
181 }
182
183 fn name(&self) -> &str {
184 "eth_try_decode_event"
185 }
186
187 fn signature(&self) -> &Signature {
188 &self.signature
189 }
190
191 #[allow(clippy::get_first)]
192 fn return_type(&self, args: &[DataType]) -> datafusion::error::Result<DataType> {
193 if args.len() != 6 {
194 return plan_err!(
195 "eth_decode_event accepts 6 arguments: event_signature, topic0, topic1, topic2, topic3, data"
196 );
197 }
198 if !matches!(args.get(0), Some(&DataType::Utf8)) {
199 return plan_err!("event_signature must be a Utf8 scalar");
200 }
201 if !matches!(args.get(1), Some(&DataType::Binary)) {
202 return plan_err!("topic0 must be a Binary");
203 }
204 if !matches!(args.get(2), Some(&DataType::Binary)) {
205 return plan_err!("topic1 must be a Binary");
206 }
207 if !matches!(args.get(3), Some(&DataType::Binary)) {
208 return plan_err!("topic2 must be a Binary");
209 }
210 if !matches!(args.get(4), Some(&DataType::Binary)) {
211 return plan_err!("topic3 must be a Binary");
212 }
213 if !matches!(args.get(5), Some(&DataType::Binary)) {
214 return plan_err!("data must be a Binary");
215 }
216 Ok(DataType::Utf8)
217 }
218
219 fn invoke_with_args(
220 &self,
221 args: ScalarFunctionArgs,
222 ) -> datafusion::error::Result<ColumnarValue> {
223 let signature = match &args.args[0] {
224 ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
225 _ => return plan_err!("Event signature must be a Utf8 scalar"),
226 };
227
228 let event = Event::parse(signature)
229 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
230
231 let args = ColumnarValue::values_to_arrays(&args.args[1..])?;
232 let c_topic0 = datafusion::common::cast::as_binary_array(&args[0])?;
233 let c_topic1 = datafusion::common::cast::as_binary_array(&args[1])?;
234 let c_topic2 = datafusion::common::cast::as_binary_array(&args[2])?;
235 let c_topic3 = datafusion::common::cast::as_binary_array(&args[3])?;
236 let c_data = datafusion::common::cast::as_binary_array(&args[4])?;
237
238 let mut builder = array::StringBuilder::new();
239
240 for i in 0..c_topic0.len() {
241 let mut topics = Vec::with_capacity(4);
242
243 for c_topic in [c_topic0, c_topic1, c_topic2, c_topic3] {
244 if c_topic.is_null(i) {
245 break;
246 }
247 topics.push(B256::try_from(c_topic.value(i)).expect("??"));
248 }
249 let data = c_data.value(i);
250
251 match event.decode_log_parts(topics, data) {
252 Ok(decoded) => builder.append_value(event_to_json(&event, &decoded).to_string()),
253 Err(_) => builder.append_null(),
254 }
255 }
256
257 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
258 }
259}
260
261#[derive(Debug, PartialEq, Eq, Hash)]
264struct UdfEthEventSelector {
265 signature: Signature,
266}
267
268impl UdfEthEventSelector {
269 fn new() -> Self {
270 Self {
271 signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable),
272 }
273 }
274}
275
276impl ScalarUDFImpl for UdfEthEventSelector {
277 fn as_any(&self) -> &dyn Any {
278 self
279 }
280
281 fn name(&self) -> &str {
282 "eth_event_selector"
283 }
284
285 fn signature(&self) -> &Signature {
286 &self.signature
287 }
288
289 fn return_type(&self, _args: &[DataType]) -> datafusion::error::Result<DataType> {
290 Ok(DataType::Binary)
291 }
292
293 fn invoke_with_args(
294 &self,
295 args: ScalarFunctionArgs,
296 ) -> datafusion::error::Result<ColumnarValue> {
297 let signature = match &args.args[0] {
298 ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => v,
299 _ => return plan_err!("Event signature must be a Utf8 scalar"),
300 };
301
302 let event = Event::parse(signature)
303 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
304
305 let mut builder = array::BinaryBuilder::new();
306 builder.append_value(event.selector().as_slice());
307 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
308 }
309}
310
311pub fn register_all(registry: &mut dyn FunctionRegistry) -> datafusion::error::Result<()> {
314 registry.register_udf(Arc::new(ScalarUDF::from(UdfEthDecodeEvent::new())))?;
315 registry.register_udf(Arc::new(ScalarUDF::from(UdfEthTryDecodeEvent::new())))?;
316 registry.register_udf(Arc::new(ScalarUDF::from(UdfEthEventSelector::new())))?;
317 Ok(())
318}
319
320