1use crate::engine::error::Result;
2use crate::engine::executor::ArenaContext;
3use crate::engine::functions::filter::FilterConfig;
4use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
5use crate::engine::functions::log::LogConfig;
6use crate::engine::functions::map::MapConfig;
7use crate::engine::functions::parse::{
8 ParseConfig, execute_parse_json_in_arena, execute_parse_xml,
9};
10use crate::engine::functions::publish::{PublishConfig, execute_publish_json, execute_publish_xml};
11use crate::engine::functions::validation::ValidationConfig;
12use crate::engine::message::{Change, Message};
13use crate::engine::task_outcome::TaskOutcome;
14use datalogic_rs::Engine;
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Deserializer};
17use serde_json::Value;
18use std::any::Any;
19use std::sync::Arc;
20
21#[derive(Clone)]
30pub struct CompiledCustomInput(pub Arc<dyn Any + Send + Sync>);
31
32impl CompiledCustomInput {
33 #[inline]
36 pub fn as_any(&self) -> &(dyn Any + Send + Sync) {
37 &*self.0
38 }
39}
40
41impl std::fmt::Debug for CompiledCustomInput {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.write_str("CompiledCustomInput(<opaque>)")
44 }
45}
46
47#[derive(Debug, Clone)]
56pub enum FunctionConfig {
57 Map {
58 name: MapName,
59 input: MapConfig,
60 },
61 Validation {
62 name: ValidationName,
63 input: ValidationConfig,
64 },
65 ParseJson {
66 name: ParseJsonName,
67 input: ParseConfig,
68 },
69 ParseXml {
70 name: ParseXmlName,
71 input: ParseConfig,
72 },
73 PublishJson {
74 name: PublishJsonName,
75 input: PublishConfig,
76 },
77 PublishXml {
78 name: PublishXmlName,
79 input: PublishConfig,
80 },
81 Filter {
82 name: FilterName,
83 input: FilterConfig,
84 },
85 Log {
86 name: LogName,
87 input: LogConfig,
88 },
89 HttpCall {
90 name: HttpCallName,
91 input: HttpCallConfig,
92 },
93 Enrich {
94 name: EnrichName,
95 input: EnrichConfig,
96 },
97 PublishKafka {
98 name: PublishKafkaName,
99 input: PublishKafkaConfig,
100 },
101 Custom {
104 name: String,
105 input: Value,
106 compiled_input: Option<CompiledCustomInput>,
112 },
113}
114
115#[derive(Debug, Clone, Deserialize)]
116#[serde(rename_all = "lowercase")]
117pub enum MapName {
118 Map,
119}
120
121#[derive(Debug, Clone, Deserialize, PartialEq)]
122#[serde(rename_all = "lowercase")]
123pub enum ValidationName {
124 Validation,
125 Validate,
126}
127
128#[derive(Debug, Clone, Deserialize, PartialEq)]
129#[serde(rename_all = "snake_case")]
130pub enum ParseJsonName {
131 ParseJson,
132}
133
134#[derive(Debug, Clone, Deserialize, PartialEq)]
135#[serde(rename_all = "snake_case")]
136pub enum ParseXmlName {
137 ParseXml,
138}
139
140#[derive(Debug, Clone, Deserialize, PartialEq)]
141#[serde(rename_all = "snake_case")]
142pub enum PublishJsonName {
143 PublishJson,
144}
145
146#[derive(Debug, Clone, Deserialize, PartialEq)]
147#[serde(rename_all = "snake_case")]
148pub enum PublishXmlName {
149 PublishXml,
150}
151
152#[derive(Debug, Clone, Deserialize, PartialEq)]
153#[serde(rename_all = "lowercase")]
154pub enum FilterName {
155 Filter,
156}
157
158#[derive(Debug, Clone, Deserialize, PartialEq)]
159#[serde(rename_all = "lowercase")]
160pub enum LogName {
161 Log,
162}
163
164#[derive(Debug, Clone, Deserialize, PartialEq)]
165#[serde(rename_all = "snake_case")]
166pub enum HttpCallName {
167 HttpCall,
168}
169
170#[derive(Debug, Clone, Deserialize, PartialEq)]
171#[serde(rename_all = "snake_case")]
172pub enum EnrichName {
173 Enrich,
174}
175
176#[derive(Debug, Clone, Deserialize, PartialEq)]
177#[serde(rename_all = "snake_case")]
178pub enum PublishKafkaName {
179 PublishKafka,
180}
181
182pub(crate) const BUILTIN_FUNCTION_NAMES: &[&str] = &[
187 "map",
188 "validation",
189 "validate",
190 "parse_json",
191 "parse_xml",
192 "publish_json",
193 "publish_xml",
194 "filter",
195 "log",
196 "http_call",
197 "enrich",
198 "publish_kafka",
199];
200
201fn parse_function_input<T, E>(func: &str, input: Value) -> std::result::Result<T, E>
208where
209 T: DeserializeOwned,
210 E: serde::de::Error,
211{
212 serde_json::from_value::<T>(input).map_err(|err| {
213 let raw = err.to_string();
214 let trimmed = raw
215 .rsplit_once(" at line ")
216 .map(|(head, _)| head)
217 .unwrap_or(&raw);
218 E::custom(format!("config for function '{func}': {trimmed}"))
219 })
220}
221
222impl<'de> Deserialize<'de> for FunctionConfig {
223 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
224 where
225 D: Deserializer<'de>,
226 {
227 #[derive(Deserialize)]
231 struct Raw {
232 name: String,
233 input: Value,
234 }
235
236 let Raw { name, input } = Raw::deserialize(deserializer)?;
237
238 Ok(match name.as_str() {
239 "map" => FunctionConfig::Map {
240 name: MapName::Map,
241 input: parse_function_input("map", input)?,
242 },
243 "validate" => FunctionConfig::Validation {
244 name: ValidationName::Validate,
245 input: parse_function_input("validate", input)?,
246 },
247 "validation" => FunctionConfig::Validation {
248 name: ValidationName::Validation,
249 input: parse_function_input("validation", input)?,
250 },
251 "parse_json" => FunctionConfig::ParseJson {
252 name: ParseJsonName::ParseJson,
253 input: parse_function_input("parse_json", input)?,
254 },
255 "parse_xml" => FunctionConfig::ParseXml {
256 name: ParseXmlName::ParseXml,
257 input: parse_function_input("parse_xml", input)?,
258 },
259 "publish_json" => FunctionConfig::PublishJson {
260 name: PublishJsonName::PublishJson,
261 input: parse_function_input("publish_json", input)?,
262 },
263 "publish_xml" => FunctionConfig::PublishXml {
264 name: PublishXmlName::PublishXml,
265 input: parse_function_input("publish_xml", input)?,
266 },
267 "filter" => FunctionConfig::Filter {
268 name: FilterName::Filter,
269 input: parse_function_input("filter", input)?,
270 },
271 "log" => FunctionConfig::Log {
272 name: LogName::Log,
273 input: parse_function_input("log", input)?,
274 },
275 "http_call" => FunctionConfig::HttpCall {
276 name: HttpCallName::HttpCall,
277 input: parse_function_input("http_call", input)?,
278 },
279 "enrich" => FunctionConfig::Enrich {
280 name: EnrichName::Enrich,
281 input: parse_function_input("enrich", input)?,
282 },
283 "publish_kafka" => FunctionConfig::PublishKafka {
284 name: PublishKafkaName::PublishKafka,
285 input: parse_function_input("publish_kafka", input)?,
286 },
287 _ => FunctionConfig::Custom {
288 name,
289 input,
290 compiled_input: None,
291 },
292 })
293 }
294}
295
296impl FunctionConfig {
297 pub fn function_name(&self) -> &str {
299 match self {
300 FunctionConfig::Map { .. } => "map",
301 FunctionConfig::Validation { .. } => "validate",
302 FunctionConfig::ParseJson { .. } => "parse_json",
303 FunctionConfig::ParseXml { .. } => "parse_xml",
304 FunctionConfig::PublishJson { .. } => "publish_json",
305 FunctionConfig::PublishXml { .. } => "publish_xml",
306 FunctionConfig::Filter { .. } => "filter",
307 FunctionConfig::Log { .. } => "log",
308 FunctionConfig::HttpCall { .. } => "http_call",
309 FunctionConfig::Enrich { .. } => "enrich",
310 FunctionConfig::PublishKafka { .. } => "publish_kafka",
311 FunctionConfig::Custom { name, .. } => name,
312 }
313 }
314
315 pub fn is_sync_builtin(&self) -> bool {
322 matches!(
323 self,
324 FunctionConfig::Map { .. }
325 | FunctionConfig::Validation { .. }
326 | FunctionConfig::ParseJson { .. }
327 | FunctionConfig::ParseXml { .. }
328 | FunctionConfig::PublishJson { .. }
329 | FunctionConfig::PublishXml { .. }
330 | FunctionConfig::Filter { .. }
331 | FunctionConfig::Log { .. }
332 )
333 }
334
335 pub(crate) fn try_execute_in_arena(
349 &self,
350 message: &mut Message,
351 arena_ctx: &mut ArenaContext<'_>,
352 engine: &Arc<Engine>,
353 map_snapshot_buf: Option<&mut Vec<Value>>,
354 ) -> Option<Result<(TaskOutcome, Vec<Change>)>> {
355 match self {
356 FunctionConfig::Map { input, .. } => {
357 Some(input.execute_in_arena(message, arena_ctx, engine, map_snapshot_buf))
358 }
359 FunctionConfig::Validation { input, .. } => {
360 Some(input.execute_in_arena(message, arena_ctx, engine))
361 }
362 FunctionConfig::ParseJson { input, .. } => {
363 Some(execute_parse_json_in_arena(message, input, arena_ctx))
364 }
365 FunctionConfig::ParseXml { input, .. } => {
366 Some(match execute_parse_xml(message, input) {
369 Ok(r) => {
370 arena_ctx.refresh_for_path(&message.context, "data");
371 Ok(r)
372 }
373 Err(e) => Err(e),
374 })
375 }
376 FunctionConfig::PublishJson { input, .. } => {
377 Some(match execute_publish_json(message, input) {
382 Ok(r) => {
383 arena_ctx.refresh_for_path(&message.context, "data");
384 Ok(r)
385 }
386 Err(e) => Err(e),
387 })
388 }
389 FunctionConfig::PublishXml { input, .. } => {
390 Some(match execute_publish_xml(message, input) {
391 Ok(r) => {
392 arena_ctx.refresh_for_path(&message.context, "data");
393 Ok(r)
394 }
395 Err(e) => Err(e),
396 })
397 }
398 FunctionConfig::Filter { input, .. } => {
399 Some(input.execute_in_arena(message, arena_ctx, engine))
400 }
401 FunctionConfig::Log { input, .. } => {
402 Some(input.execute_in_arena(message, arena_ctx, engine))
403 }
404 FunctionConfig::HttpCall { .. }
405 | FunctionConfig::Enrich { .. }
406 | FunctionConfig::PublishKafka { .. }
407 | FunctionConfig::Custom { .. } => None,
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use serde_json::json;
416
417 fn parse(value: serde_json::Value) -> std::result::Result<FunctionConfig, serde_json::Error> {
418 serde_json::from_value(value)
419 }
420
421 #[test]
422 fn map_with_valid_config_deserializes_to_map_variant() {
423 let cfg = parse(json!({
424 "name": "map",
425 "input": {
426 "mappings": [
427 { "path": "data.x", "logic": { "var": "data.y" } }
428 ]
429 }
430 }))
431 .expect("valid map config should deserialize");
432 assert!(matches!(cfg, FunctionConfig::Map { .. }));
433 }
434
435 #[test]
436 fn map_with_missing_mappings_gives_clear_error() {
437 let err = parse(json!({
438 "name": "map",
439 "input": {}
440 }))
441 .expect_err("map with empty input should fail");
442 let msg = err.to_string();
443 assert!(
444 msg.starts_with("config for function 'map':"),
445 "error should be prefixed with function envelope, got: {msg}"
446 );
447 assert!(
448 msg.contains("mappings"),
449 "error should mention the missing field, got: {msg}"
450 );
451 }
452
453 #[test]
454 fn map_with_wrong_input_shape_gives_clear_error() {
455 let err = parse(json!({
456 "name": "map",
457 "input": { "mappings": "not an array" }
458 }))
459 .expect_err("map with bad mappings type should fail");
460 let msg = err.to_string();
461 assert!(
462 msg.starts_with("config for function 'map':"),
463 "error should be prefixed with function envelope, got: {msg}"
464 );
465 }
466
467 #[test]
468 fn validation_accepts_both_spellings() {
469 for name in ["validate", "validation"] {
470 let cfg = parse(json!({
471 "name": name,
472 "input": { "rules": [] }
473 }))
474 .unwrap_or_else(|e| panic!("'{name}' should deserialize: {e}"));
475 assert!(matches!(cfg, FunctionConfig::Validation { .. }));
476 }
477 }
478
479 #[test]
480 fn unknown_name_falls_through_to_custom() {
481 let cfg = parse(json!({
482 "name": "my_custom_handler",
483 "input": { "anything": "goes" }
484 }))
485 .expect("unknown name should produce Custom");
486 match cfg {
487 FunctionConfig::Custom {
488 name,
489 compiled_input,
490 ..
491 } => {
492 assert_eq!(name, "my_custom_handler");
493 assert!(compiled_input.is_none());
494 }
495 other => panic!("expected Custom, got {other:?}"),
496 }
497 }
498
499 #[test]
500 fn missing_name_field_errors() {
501 let err = parse(json!({ "input": {} })).expect_err("missing name should fail");
502 assert!(err.to_string().contains("name"));
503 }
504
505 #[test]
506 fn missing_input_field_errors() {
507 let err = parse(json!({ "name": "map" })).expect_err("missing input should fail");
508 assert!(err.to_string().contains("input"));
509 }
510
511 #[test]
512 fn http_call_with_missing_connector_gives_clear_error() {
513 let err = parse(json!({
514 "name": "http_call",
515 "input": { "method": "GET" }
516 }))
517 .expect_err("http_call needs connector");
518 let msg = err.to_string();
519 assert!(
520 msg.starts_with("config for function 'http_call':"),
521 "error should be prefixed with function envelope, got: {msg}"
522 );
523 assert!(msg.contains("connector"));
524 }
525
526 #[test]
527 fn builtin_names_never_fall_through_to_custom() {
528 for name in BUILTIN_FUNCTION_NAMES {
532 let cfg = parse(json!({
533 "name": name,
534 "input": {}
535 }));
536 match cfg {
537 Ok(c) => assert!(
538 !matches!(c, FunctionConfig::Custom { .. }),
539 "name '{name}' silently fell through to Custom"
540 ),
541 Err(e) => assert!(
542 e.to_string()
543 .starts_with(&format!("config for function '{name}':")),
544 "name '{name}' failed without envelope: {e}"
545 ),
546 }
547 }
548 }
549}