Skip to main content

dataflow_rs/engine/functions/
integration.rs

1use datalogic_rs::Logic;
2use serde::Deserialize;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7/// Configuration for the http_call integration function.
8///
9/// The actual HTTP implementation is provided by the service layer via AsyncFunctionHandler.
10/// This struct provides typed config validation and pre-compilation of JSONLogic expressions.
11#[derive(Debug, Clone, Deserialize)]
12pub struct HttpCallConfig {
13    /// Named connector reference (resolved by service layer)
14    pub connector: String,
15
16    /// HTTP method
17    #[serde(default = "default_method")]
18    pub method: HttpMethod,
19
20    /// Static path string
21    #[serde(default)]
22    pub path: Option<String>,
23
24    /// JSONLogic expression to compute path dynamically
25    #[serde(default)]
26    pub path_logic: Option<Value>,
27
28    /// Pre-compiled `path_logic`, populated by `LogicCompiler`.
29    #[serde(skip)]
30    pub compiled_path_logic: Option<Arc<Logic>>,
31
32    /// Static headers
33    #[serde(default)]
34    pub headers: HashMap<String, String>,
35
36    /// Static request body
37    #[serde(default)]
38    pub body: Option<Value>,
39
40    /// JSONLogic expression to compute body dynamically
41    #[serde(default)]
42    pub body_logic: Option<Value>,
43
44    /// Pre-compiled `body_logic`, populated by `LogicCompiler`.
45    #[serde(skip)]
46    pub compiled_body_logic: Option<Arc<Logic>>,
47
48    /// JSONPath/dot-path to extract from response and merge into context
49    #[serde(default)]
50    pub response_path: Option<String>,
51
52    /// Request timeout in milliseconds (default: 30000)
53    #[serde(default = "default_timeout")]
54    pub timeout_ms: u64,
55}
56
57/// HTTP methods supported by http_call
58#[derive(Debug, Clone, Default, Deserialize)]
59#[serde(rename_all = "UPPERCASE")]
60pub enum HttpMethod {
61    #[default]
62    Get,
63    Post,
64    Put,
65    Patch,
66    Delete,
67}
68
69fn default_method() -> HttpMethod {
70    HttpMethod::Get
71}
72
73fn default_timeout() -> u64 {
74    30000
75}
76
77/// Configuration for the enrich integration function.
78///
79/// Enrichment calls an external service and merges the response into the message context.
80#[derive(Debug, Clone, Deserialize)]
81pub struct EnrichConfig {
82    /// Named connector reference
83    pub connector: String,
84
85    /// HTTP method for the enrichment call
86    #[serde(default = "default_method")]
87    pub method: HttpMethod,
88
89    /// Static path
90    #[serde(default)]
91    pub path: Option<String>,
92
93    /// JSONLogic expression to compute path dynamically
94    #[serde(default)]
95    pub path_logic: Option<Value>,
96
97    /// Pre-compiled `path_logic`, populated by `LogicCompiler`.
98    #[serde(skip)]
99    pub compiled_path_logic: Option<Arc<Logic>>,
100
101    /// Dot-path where enrichment data is merged into the message context
102    pub merge_path: String,
103
104    /// Request timeout in milliseconds (default: 30000)
105    #[serde(default = "default_timeout")]
106    pub timeout_ms: u64,
107
108    /// What to do on enrichment failure
109    #[serde(default)]
110    pub on_error: EnrichErrorAction,
111}
112
113/// What to do when enrichment fails
114#[derive(Debug, Clone, Deserialize, Default)]
115#[serde(rename_all = "lowercase")]
116pub enum EnrichErrorAction {
117    /// Fail the task (default)
118    #[default]
119    Fail,
120    /// Skip enrichment and continue
121    Skip,
122}
123
124/// Configuration for the publish_kafka integration function.
125///
126/// The actual Kafka producer is provided by the service layer via AsyncFunctionHandler.
127#[derive(Debug, Clone, Deserialize)]
128pub struct PublishKafkaConfig {
129    /// Named connector reference
130    pub connector: String,
131
132    /// Target topic name
133    pub topic: String,
134
135    /// JSONLogic expression to compute the message key
136    #[serde(default)]
137    pub key_logic: Option<Value>,
138
139    /// Pre-compiled `key_logic`, populated by `LogicCompiler`.
140    #[serde(skip)]
141    pub compiled_key_logic: Option<Arc<Logic>>,
142
143    /// JSONLogic expression to compute the message value
144    #[serde(default)]
145    pub value_logic: Option<Value>,
146
147    /// Pre-compiled `value_logic`, populated by `LogicCompiler`.
148    #[serde(skip)]
149    pub compiled_value_logic: Option<Arc<Logic>>,
150}