Skip to main content

dataflow_rs/engine/functions/
integration.rs

1use serde::Deserialize;
2use serde_json::Value;
3use std::collections::HashMap;
4
5/// Configuration for the http_call integration function.
6///
7/// The actual HTTP implementation is provided by the service layer via AsyncFunctionHandler.
8/// This struct provides typed config validation and pre-compilation of JSONLogic expressions.
9#[derive(Debug, Clone, Deserialize)]
10pub struct HttpCallConfig {
11    /// Named connector reference (resolved by service layer)
12    pub connector: String,
13
14    /// HTTP method
15    #[serde(default = "default_method")]
16    pub method: HttpMethod,
17
18    /// Static path string
19    #[serde(default)]
20    pub path: Option<String>,
21
22    /// JSONLogic expression to compute path dynamically
23    #[serde(default)]
24    pub path_logic: Option<Value>,
25
26    /// Cache index for compiled path_logic
27    #[serde(skip)]
28    pub path_logic_index: Option<usize>,
29
30    /// Static headers
31    #[serde(default)]
32    pub headers: HashMap<String, String>,
33
34    /// Static request body
35    #[serde(default)]
36    pub body: Option<Value>,
37
38    /// JSONLogic expression to compute body dynamically
39    #[serde(default)]
40    pub body_logic: Option<Value>,
41
42    /// Cache index for compiled body_logic
43    #[serde(skip)]
44    pub body_logic_index: Option<usize>,
45
46    /// JSONPath/dot-path to extract from response and merge into context
47    #[serde(default)]
48    pub response_path: Option<String>,
49
50    /// Request timeout in milliseconds (default: 30000)
51    #[serde(default = "default_timeout")]
52    pub timeout_ms: u64,
53}
54
55/// HTTP methods supported by http_call
56#[derive(Debug, Clone, Default, Deserialize)]
57#[serde(rename_all = "UPPERCASE")]
58pub enum HttpMethod {
59    #[default]
60    Get,
61    Post,
62    Put,
63    Patch,
64    Delete,
65}
66
67fn default_method() -> HttpMethod {
68    HttpMethod::Get
69}
70
71fn default_timeout() -> u64 {
72    30000
73}
74
75/// Configuration for the enrich integration function.
76///
77/// Enrichment calls an external service and merges the response into the message context.
78#[derive(Debug, Clone, Deserialize)]
79pub struct EnrichConfig {
80    /// Named connector reference
81    pub connector: String,
82
83    /// HTTP method for the enrichment call
84    #[serde(default = "default_method")]
85    pub method: HttpMethod,
86
87    /// Static path
88    #[serde(default)]
89    pub path: Option<String>,
90
91    /// JSONLogic expression to compute path dynamically
92    #[serde(default)]
93    pub path_logic: Option<Value>,
94
95    /// Cache index for compiled path_logic
96    #[serde(skip)]
97    pub path_logic_index: Option<usize>,
98
99    /// Dot-path where enrichment data is merged into the message context
100    pub merge_path: String,
101
102    /// Request timeout in milliseconds (default: 30000)
103    #[serde(default = "default_timeout")]
104    pub timeout_ms: u64,
105
106    /// What to do on enrichment failure
107    #[serde(default)]
108    pub on_error: EnrichErrorAction,
109}
110
111/// What to do when enrichment fails
112#[derive(Debug, Clone, Deserialize, Default)]
113#[serde(rename_all = "lowercase")]
114pub enum EnrichErrorAction {
115    /// Fail the task (default)
116    #[default]
117    Fail,
118    /// Skip enrichment and continue
119    Skip,
120}
121
122/// Configuration for the publish_kafka integration function.
123///
124/// The actual Kafka producer is provided by the service layer via AsyncFunctionHandler.
125#[derive(Debug, Clone, Deserialize)]
126pub struct PublishKafkaConfig {
127    /// Named connector reference
128    pub connector: String,
129
130    /// Target topic name
131    pub topic: String,
132
133    /// JSONLogic expression to compute the message key
134    #[serde(default)]
135    pub key_logic: Option<Value>,
136
137    /// Cache index for compiled key_logic
138    #[serde(skip)]
139    pub key_logic_index: Option<usize>,
140
141    /// JSONLogic expression to compute the message value
142    #[serde(default)]
143    pub value_logic: Option<Value>,
144
145    /// Cache index for compiled value_logic
146    #[serde(skip)]
147    pub value_logic_index: Option<usize>,
148}