Skip to main content

dataflow_rs/engine/functions/
config.rs

1use crate::engine::error::Result;
2use crate::engine::executor::ArenaContext;
3use crate::engine::functions::filter::FilterConfig;
4use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
5use crate::engine::functions::log::LogConfig;
6use crate::engine::functions::map::MapConfig;
7use crate::engine::functions::parse::{
8    ParseConfig, execute_parse_json_in_arena, execute_parse_xml,
9};
10use crate::engine::functions::publish::{PublishConfig, execute_publish_json, execute_publish_xml};
11use crate::engine::functions::validation::ValidationConfig;
12use crate::engine::message::{Change, Message};
13use crate::engine::task_outcome::TaskOutcome;
14use datalogic_rs::Engine;
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Deserializer};
17use serde_json::Value;
18use std::any::Any;
19use std::sync::Arc;
20
21/// Pre-parsed typed input for a `FunctionConfig::Custom` task. Populated by
22/// the engine at `Engine::new()` time by calling the registered
23/// `AsyncFunctionHandler::parse_input` for the named function. Cached as
24/// `Arc<dyn Any>` so the dispatch path can hand it to the handler with a
25/// single `downcast_ref` (O(1)) and zero per-message deserialization cost.
26///
27/// The wrapper exists because `dyn Any` does not implement `Debug`, which
28/// would otherwise prevent `#[derive(Debug)]` on `FunctionConfig`.
29#[derive(Clone)]
30pub struct CompiledCustomInput(pub Arc<dyn Any + Send + Sync>);
31
32impl CompiledCustomInput {
33    /// Borrow the inner value as `&(dyn Any + Send + Sync)` for handoff to
34    /// `DynAsyncFunctionHandler::dyn_execute`.
35    #[inline]
36    pub fn as_any(&self) -> &(dyn Any + Send + Sync) {
37        &*self.0
38    }
39}
40
41impl std::fmt::Debug for CompiledCustomInput {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.write_str("CompiledCustomInput(<opaque>)")
44    }
45}
46
47/// Enum containing all possible function configurations.
48///
49/// Deserialization dispatches on the `name` field: each known built-in
50/// (`map`, `validate`, `parse_json`, …) parses its `input` strictly into the
51/// matching typed config and errors with a clear envelope (`config for
52/// function 'map': missing field 'mappings'`). Unknown names fall through
53/// to [`FunctionConfig::Custom`], which preserves the raw input for a
54/// user-registered handler to consume at engine construction time.
55#[derive(Debug, Clone)]
56pub enum FunctionConfig {
57    Map {
58        name: MapName,
59        input: MapConfig,
60    },
61    Validation {
62        name: ValidationName,
63        input: ValidationConfig,
64    },
65    ParseJson {
66        name: ParseJsonName,
67        input: ParseConfig,
68    },
69    ParseXml {
70        name: ParseXmlName,
71        input: ParseConfig,
72    },
73    PublishJson {
74        name: PublishJsonName,
75        input: PublishConfig,
76    },
77    PublishXml {
78        name: PublishXmlName,
79        input: PublishConfig,
80    },
81    Filter {
82        name: FilterName,
83        input: FilterConfig,
84    },
85    Log {
86        name: LogName,
87        input: LogConfig,
88    },
89    HttpCall {
90        name: HttpCallName,
91        input: HttpCallConfig,
92    },
93    Enrich {
94        name: EnrichName,
95        input: EnrichConfig,
96    },
97    PublishKafka {
98        name: PublishKafkaName,
99        input: PublishKafkaConfig,
100    },
101    /// For custom or unknown functions, store raw input and a slot for the
102    /// pre-parsed typed value populated at engine construction time.
103    Custom {
104        name: String,
105        input: Value,
106        /// Pre-parsed `<RegisteredHandler as AsyncFunctionHandler>::Input`,
107        /// boxed as `dyn Any`. Set by the engine after handler registration;
108        /// `None` on initial deserialization. `FunctionConfig` is
109        /// deserialize-only — round-tripping a workflow through JSON
110        /// re-parses on the next `Engine::new()` call.
111        compiled_input: Option<CompiledCustomInput>,
112    },
113}
114
115#[derive(Debug, Clone, Deserialize)]
116#[serde(rename_all = "lowercase")]
117pub enum MapName {
118    Map,
119}
120
121#[derive(Debug, Clone, Deserialize, PartialEq)]
122#[serde(rename_all = "lowercase")]
123pub enum ValidationName {
124    Validation,
125    Validate,
126}
127
128#[derive(Debug, Clone, Deserialize, PartialEq)]
129#[serde(rename_all = "snake_case")]
130pub enum ParseJsonName {
131    ParseJson,
132}
133
134#[derive(Debug, Clone, Deserialize, PartialEq)]
135#[serde(rename_all = "snake_case")]
136pub enum ParseXmlName {
137    ParseXml,
138}
139
140#[derive(Debug, Clone, Deserialize, PartialEq)]
141#[serde(rename_all = "snake_case")]
142pub enum PublishJsonName {
143    PublishJson,
144}
145
146#[derive(Debug, Clone, Deserialize, PartialEq)]
147#[serde(rename_all = "snake_case")]
148pub enum PublishXmlName {
149    PublishXml,
150}
151
152#[derive(Debug, Clone, Deserialize, PartialEq)]
153#[serde(rename_all = "lowercase")]
154pub enum FilterName {
155    Filter,
156}
157
158#[derive(Debug, Clone, Deserialize, PartialEq)]
159#[serde(rename_all = "lowercase")]
160pub enum LogName {
161    Log,
162}
163
164#[derive(Debug, Clone, Deserialize, PartialEq)]
165#[serde(rename_all = "snake_case")]
166pub enum HttpCallName {
167    HttpCall,
168}
169
170#[derive(Debug, Clone, Deserialize, PartialEq)]
171#[serde(rename_all = "snake_case")]
172pub enum EnrichName {
173    Enrich,
174}
175
176#[derive(Debug, Clone, Deserialize, PartialEq)]
177#[serde(rename_all = "snake_case")]
178pub enum PublishKafkaName {
179    PublishKafka,
180}
181
182/// Names of the built-in function variants — used in error messages and as
183/// the discriminator for [`FunctionConfig`] deserialization. Kept in one
184/// place so adding a new built-in updates the dispatch, the error suggestion
185/// list, and the docs in lockstep.
186pub(crate) const BUILTIN_FUNCTION_NAMES: &[&str] = &[
187    "map",
188    "validation",
189    "validate",
190    "parse_json",
191    "parse_xml",
192    "publish_json",
193    "publish_xml",
194    "filter",
195    "log",
196    "http_call",
197    "enrich",
198    "publish_kafka",
199];
200
201/// Parse a `serde_json::Value` into a typed config, wrapping any error in a
202/// "config for function '<func>': …" envelope. Strips the trailing
203/// `" at line 0 column 0"` that `serde_json::from_value` always appends
204/// (since the source `Value` has no source-text location); the outer
205/// deserializer re-attaches the real source location when this error
206/// bubbles up to e.g. `Workflow::from_json`.
207fn parse_function_input<T, E>(func: &str, input: Value) -> std::result::Result<T, E>
208where
209    T: DeserializeOwned,
210    E: serde::de::Error,
211{
212    serde_json::from_value::<T>(input).map_err(|err| {
213        let raw = err.to_string();
214        let trimmed = raw
215            .rsplit_once(" at line ")
216            .map(|(head, _)| head)
217            .unwrap_or(&raw);
218        E::custom(format!("config for function '{func}': {trimmed}"))
219    })
220}
221
222impl<'de> Deserialize<'de> for FunctionConfig {
223    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
224    where
225        D: Deserializer<'de>,
226    {
227        // Tag-only intermediate. Format-agnostic: works for any deserializer
228        // that produces `String`/`serde_json::Value`. The strict typed parse
229        // happens in the dispatch below.
230        #[derive(Deserialize)]
231        struct Raw {
232            name: String,
233            input: Value,
234        }
235
236        let Raw { name, input } = Raw::deserialize(deserializer)?;
237
238        Ok(match name.as_str() {
239            "map" => FunctionConfig::Map {
240                name: MapName::Map,
241                input: parse_function_input("map", input)?,
242            },
243            "validate" => FunctionConfig::Validation {
244                name: ValidationName::Validate,
245                input: parse_function_input("validate", input)?,
246            },
247            "validation" => FunctionConfig::Validation {
248                name: ValidationName::Validation,
249                input: parse_function_input("validation", input)?,
250            },
251            "parse_json" => FunctionConfig::ParseJson {
252                name: ParseJsonName::ParseJson,
253                input: parse_function_input("parse_json", input)?,
254            },
255            "parse_xml" => FunctionConfig::ParseXml {
256                name: ParseXmlName::ParseXml,
257                input: parse_function_input("parse_xml", input)?,
258            },
259            "publish_json" => FunctionConfig::PublishJson {
260                name: PublishJsonName::PublishJson,
261                input: parse_function_input("publish_json", input)?,
262            },
263            "publish_xml" => FunctionConfig::PublishXml {
264                name: PublishXmlName::PublishXml,
265                input: parse_function_input("publish_xml", input)?,
266            },
267            "filter" => FunctionConfig::Filter {
268                name: FilterName::Filter,
269                input: parse_function_input("filter", input)?,
270            },
271            "log" => FunctionConfig::Log {
272                name: LogName::Log,
273                input: parse_function_input("log", input)?,
274            },
275            "http_call" => FunctionConfig::HttpCall {
276                name: HttpCallName::HttpCall,
277                input: parse_function_input("http_call", input)?,
278            },
279            "enrich" => FunctionConfig::Enrich {
280                name: EnrichName::Enrich,
281                input: parse_function_input("enrich", input)?,
282            },
283            "publish_kafka" => FunctionConfig::PublishKafka {
284                name: PublishKafkaName::PublishKafka,
285                input: parse_function_input("publish_kafka", input)?,
286            },
287            _ => FunctionConfig::Custom {
288                name,
289                input,
290                compiled_input: None,
291            },
292        })
293    }
294}
295
296impl FunctionConfig {
297    /// Get the function name for this configuration
298    pub fn function_name(&self) -> &str {
299        match self {
300            FunctionConfig::Map { .. } => "map",
301            FunctionConfig::Validation { .. } => "validate",
302            FunctionConfig::ParseJson { .. } => "parse_json",
303            FunctionConfig::ParseXml { .. } => "parse_xml",
304            FunctionConfig::PublishJson { .. } => "publish_json",
305            FunctionConfig::PublishXml { .. } => "publish_xml",
306            FunctionConfig::Filter { .. } => "filter",
307            FunctionConfig::Log { .. } => "log",
308            FunctionConfig::HttpCall { .. } => "http_call",
309            FunctionConfig::Enrich { .. } => "enrich",
310            FunctionConfig::PublishKafka { .. } => "publish_kafka",
311            FunctionConfig::Custom { name, .. } => name,
312        }
313    }
314
315    /// Whether this is a synchronous built-in. Synchronous built-ins can share
316    /// a single `ArenaContext` lifetime across consecutive tasks within a
317    /// workflow without crossing any `.await` point.
318    ///
319    /// Must match the variants handled in [`Self::try_execute_in_arena`]; the
320    /// debug assertion below ties the two together so they can't drift.
321    pub fn is_sync_builtin(&self) -> bool {
322        matches!(
323            self,
324            FunctionConfig::Map { .. }
325                | FunctionConfig::Validation { .. }
326                | FunctionConfig::ParseJson { .. }
327                | FunctionConfig::ParseXml { .. }
328                | FunctionConfig::PublishJson { .. }
329                | FunctionConfig::PublishXml { .. }
330                | FunctionConfig::Filter { .. }
331                | FunctionConfig::Log { .. }
332        )
333    }
334
335    /// If this config is a sync built-in, execute it against the supplied
336    /// arena context and return `Some(result)`. Otherwise return `None` —
337    /// the workflow executor uses that as the signal to break the sync
338    /// stretch and dispatch the task on the async path instead.
339    ///
340    /// `map_snapshot_buf` is only consulted by the `Map` variant — when
341    /// `Some`, the map function pushes a `serde_json::Value` snapshot of the
342    /// context before each mapping (for the trace surface). All other
343    /// variants ignore it. Pass `None` from the production path.
344    ///
345    /// This is the single source of truth for the sync-stretch dispatch:
346    /// adding a new sync built-in only requires adding an arm here (and the
347    /// matching variant to `is_sync_builtin` above).
348    pub(crate) fn try_execute_in_arena(
349        &self,
350        message: &mut Message,
351        arena_ctx: &mut ArenaContext<'_>,
352        engine: &Arc<Engine>,
353        map_snapshot_buf: Option<&mut Vec<Value>>,
354    ) -> Option<Result<(TaskOutcome, Vec<Change>)>> {
355        match self {
356            FunctionConfig::Map { input, .. } => {
357                Some(input.execute_in_arena(message, arena_ctx, engine, map_snapshot_buf))
358            }
359            FunctionConfig::Validation { input, .. } => {
360                Some(input.execute_in_arena(message, arena_ctx, engine))
361            }
362            FunctionConfig::ParseJson { input, .. } => {
363                Some(execute_parse_json_in_arena(message, input, arena_ctx))
364            }
365            FunctionConfig::ParseXml { input, .. } => {
366                // Refresh the arena only on success; on error the arena cache
367                // is still in sync with the unchanged context.
368                Some(match execute_parse_xml(message, input) {
369                    Ok(r) => {
370                        arena_ctx.refresh_for_path(&message.context, "data");
371                        Ok(r)
372                    }
373                    Err(e) => Err(e),
374                })
375            }
376            FunctionConfig::PublishJson { input, .. } => {
377                // publish writes to `data.<target>` but goes through
378                // `set_nested_value` on the owned context — refresh the
379                // arena slot afterwards so the next task in the stretch
380                // observes the new value.
381                Some(match execute_publish_json(message, input) {
382                    Ok(r) => {
383                        arena_ctx.refresh_for_path(&message.context, "data");
384                        Ok(r)
385                    }
386                    Err(e) => Err(e),
387                })
388            }
389            FunctionConfig::PublishXml { input, .. } => {
390                Some(match execute_publish_xml(message, input) {
391                    Ok(r) => {
392                        arena_ctx.refresh_for_path(&message.context, "data");
393                        Ok(r)
394                    }
395                    Err(e) => Err(e),
396                })
397            }
398            FunctionConfig::Filter { input, .. } => {
399                Some(input.execute_in_arena(message, arena_ctx, engine))
400            }
401            FunctionConfig::Log { input, .. } => {
402                Some(input.execute_in_arena(message, arena_ctx, engine))
403            }
404            FunctionConfig::HttpCall { .. }
405            | FunctionConfig::Enrich { .. }
406            | FunctionConfig::PublishKafka { .. }
407            | FunctionConfig::Custom { .. } => None,
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use serde_json::json;
416
417    fn parse(value: serde_json::Value) -> std::result::Result<FunctionConfig, serde_json::Error> {
418        serde_json::from_value(value)
419    }
420
421    #[test]
422    fn map_with_valid_config_deserializes_to_map_variant() {
423        let cfg = parse(json!({
424            "name": "map",
425            "input": {
426                "mappings": [
427                    { "path": "data.x", "logic": { "var": "data.y" } }
428                ]
429            }
430        }))
431        .expect("valid map config should deserialize");
432        assert!(matches!(cfg, FunctionConfig::Map { .. }));
433    }
434
435    #[test]
436    fn map_with_missing_mappings_gives_clear_error() {
437        let err = parse(json!({
438            "name": "map",
439            "input": {}
440        }))
441        .expect_err("map with empty input should fail");
442        let msg = err.to_string();
443        assert!(
444            msg.starts_with("config for function 'map':"),
445            "error should be prefixed with function envelope, got: {msg}"
446        );
447        assert!(
448            msg.contains("mappings"),
449            "error should mention the missing field, got: {msg}"
450        );
451    }
452
453    #[test]
454    fn map_with_wrong_input_shape_gives_clear_error() {
455        let err = parse(json!({
456            "name": "map",
457            "input": { "mappings": "not an array" }
458        }))
459        .expect_err("map with bad mappings type should fail");
460        let msg = err.to_string();
461        assert!(
462            msg.starts_with("config for function 'map':"),
463            "error should be prefixed with function envelope, got: {msg}"
464        );
465    }
466
467    #[test]
468    fn validation_accepts_both_spellings() {
469        for name in ["validate", "validation"] {
470            let cfg = parse(json!({
471                "name": name,
472                "input": { "rules": [] }
473            }))
474            .unwrap_or_else(|e| panic!("'{name}' should deserialize: {e}"));
475            assert!(matches!(cfg, FunctionConfig::Validation { .. }));
476        }
477    }
478
479    #[test]
480    fn unknown_name_falls_through_to_custom() {
481        let cfg = parse(json!({
482            "name": "my_custom_handler",
483            "input": { "anything": "goes" }
484        }))
485        .expect("unknown name should produce Custom");
486        match cfg {
487            FunctionConfig::Custom {
488                name,
489                compiled_input,
490                ..
491            } => {
492                assert_eq!(name, "my_custom_handler");
493                assert!(compiled_input.is_none());
494            }
495            other => panic!("expected Custom, got {other:?}"),
496        }
497    }
498
499    #[test]
500    fn missing_name_field_errors() {
501        let err = parse(json!({ "input": {} })).expect_err("missing name should fail");
502        assert!(err.to_string().contains("name"));
503    }
504
505    #[test]
506    fn missing_input_field_errors() {
507        let err = parse(json!({ "name": "map" })).expect_err("missing input should fail");
508        assert!(err.to_string().contains("input"));
509    }
510
511    #[test]
512    fn http_call_with_missing_connector_gives_clear_error() {
513        let err = parse(json!({
514            "name": "http_call",
515            "input": { "method": "GET" }
516        }))
517        .expect_err("http_call needs connector");
518        let msg = err.to_string();
519        assert!(
520            msg.starts_with("config for function 'http_call':"),
521            "error should be prefixed with function envelope, got: {msg}"
522        );
523        assert!(msg.contains("connector"));
524    }
525
526    #[test]
527    fn builtin_names_never_fall_through_to_custom() {
528        // Every name in BUILTIN_FUNCTION_NAMES must be handled by the
529        // dispatch — either parsing successfully or failing with the
530        // envelope. None should silently land in Custom.
531        for name in BUILTIN_FUNCTION_NAMES {
532            let cfg = parse(json!({
533                "name": name,
534                "input": {}
535            }));
536            match cfg {
537                Ok(c) => assert!(
538                    !matches!(c, FunctionConfig::Custom { .. }),
539                    "name '{name}' silently fell through to Custom"
540                ),
541                Err(e) => assert!(
542                    e.to_string()
543                        .starts_with(&format!("config for function '{name}':")),
544                    "name '{name}' failed without envelope: {e}"
545                ),
546            }
547        }
548    }
549}