dataflow_rs/engine/functions/
parse.rs1use crate::engine::error::{DataflowError, Result};
14use crate::engine::executor::ArenaContext;
15use crate::engine::message::{Change, Message};
16use crate::engine::task_outcome::TaskOutcome;
17use crate::engine::utils::{get_nested_value, set_nested_value};
18use datavalue::OwnedDataValue;
19use log::debug;
20use serde::Deserialize;
21use serde_json::Value;
22use std::sync::Arc;
23
24#[derive(Debug, Clone, Deserialize)]
26pub struct ParseConfig {
27 pub source: String,
29
30 pub target: String,
32}
33
34impl ParseConfig {
35 pub fn from_json(input: &Value) -> Result<Self> {
36 let source = input
37 .get("source")
38 .and_then(Value::as_str)
39 .ok_or_else(|| {
40 DataflowError::Validation("Missing 'source' in parse config".to_string())
41 })?
42 .to_string();
43
44 let target = input
45 .get("target")
46 .and_then(Value::as_str)
47 .ok_or_else(|| {
48 DataflowError::Validation("Missing 'target' in parse config".to_string())
49 })?
50 .to_string();
51
52 Ok(ParseConfig { source, target })
53 }
54
55 fn extract_source(&self, message: &Message) -> OwnedDataValue {
57 if self.source == "payload" {
58 (*message.payload).clone()
59 } else if let Some(path) = self.source.strip_prefix("payload.") {
60 get_nested_value(&message.payload, path)
61 .cloned()
62 .unwrap_or(OwnedDataValue::Null)
63 } else if let Some(path) = self.source.strip_prefix("data.") {
64 get_nested_value(message.data(), path)
65 .cloned()
66 .unwrap_or(OwnedDataValue::Null)
67 } else {
68 get_nested_value(&message.context, &self.source)
69 .cloned()
70 .unwrap_or(OwnedDataValue::Null)
71 }
72 }
73}
74
75pub fn execute_parse_json(
79 message: &mut Message,
80 config: &ParseConfig,
81) -> Result<(TaskOutcome, Vec<Change>)> {
82 debug!(
83 "ParseJson: Extracting from '{}' to 'data.{}'",
84 config.source, config.target
85 );
86
87 let target_path = format!("data.{}", config.target);
88
89 let payload_fast_path =
94 config.source == "payload" && !matches!(*message.payload, OwnedDataValue::String(_));
95
96 if message.capture_changes {
97 let old_value = get_nested_value(&message.context, &target_path)
98 .cloned()
99 .unwrap_or(OwnedDataValue::Null);
100
101 let source_data = if payload_fast_path {
105 (*message.payload).clone()
106 } else {
107 let raw = config.extract_source(message);
108 match &raw {
109 OwnedDataValue::String(s) => {
110 OwnedDataValue::from_json(s).unwrap_or_else(|_| raw.clone())
111 }
112 _ => raw,
113 }
114 };
115
116 let new_value = source_data.clone();
120
121 set_nested_value(&mut message.context, &target_path, source_data);
122 debug!(
123 "ParseJson: Successfully stored data to 'data.{}'",
124 config.target
125 );
126 return Ok((
127 TaskOutcome::Success,
128 vec![Change {
129 path: Arc::from(target_path),
130 old_value,
131 new_value,
132 }],
133 ));
134 }
135
136 let source_data_for_context: OwnedDataValue = if payload_fast_path {
138 (*message.payload).clone()
139 } else {
140 let raw = config.extract_source(message);
141 match &raw {
142 OwnedDataValue::String(s) => {
143 OwnedDataValue::from_json(s).unwrap_or_else(|_| raw.clone())
144 }
145 _ => raw,
146 }
147 };
148 set_nested_value(&mut message.context, &target_path, source_data_for_context);
149
150 debug!(
151 "ParseJson: Successfully stored data to 'data.{}'",
152 config.target
153 );
154
155 Ok((TaskOutcome::Success, Vec::new()))
156}
157
158pub(crate) fn execute_parse_json_in_arena(
162 message: &mut Message,
163 config: &ParseConfig,
164 arena_ctx: &mut ArenaContext<'_>,
165) -> Result<(TaskOutcome, Vec<Change>)> {
166 let target_path = format!("data.{}", config.target);
169 let result = execute_parse_json(message, config)?;
170 arena_ctx.refresh_for_path(&message.context, &target_path);
175 Ok(result)
176}
177
178pub fn execute_parse_xml(
182 message: &mut Message,
183 config: &ParseConfig,
184) -> Result<(TaskOutcome, Vec<Change>)> {
185 debug!(
186 "ParseXml: Extracting from '{}' to 'data.{}'",
187 config.source, config.target
188 );
189
190 let source_data = config.extract_source(message);
191
192 let xml_string = match &source_data {
193 OwnedDataValue::String(s) => s.clone(),
194 _ => {
195 return Err(DataflowError::Validation(format!(
196 "ParseXml: Source '{}' is not a string",
197 config.source
198 )));
199 }
200 };
201
202 let parsed_json = xml_to_json(&xml_string)?;
203 let parsed_owned = OwnedDataValue::from(&parsed_json);
204
205 let target_path = format!("data.{}", config.target);
206 let old_value = get_nested_value(&message.context, &target_path)
207 .cloned()
208 .unwrap_or(OwnedDataValue::Null);
209
210 set_nested_value(&mut message.context, &target_path, parsed_owned.clone());
211
212 debug!(
213 "ParseXml: Successfully parsed and stored XML to 'data.{}'",
214 config.target
215 );
216
217 Ok((
218 TaskOutcome::Success,
219 vec![Change {
220 path: Arc::from(target_path),
221 old_value,
222 new_value: parsed_owned,
223 }],
224 ))
225}
226
227fn xml_to_json(xml: &str) -> Result<Value> {
229 use quick_xml::de::from_str;
230
231 let parsed: Value = from_str(xml)
232 .map_err(|e| DataflowError::Validation(format!("Failed to parse XML: {}", e)))?;
233
234 Ok(parsed)
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use serde_json::json;
241
242 fn dv(v: serde_json::Value) -> OwnedDataValue {
243 OwnedDataValue::from(&v)
244 }
245
246 #[test]
247 fn test_parse_config_from_json() {
248 let input = json!({"source": "payload", "target": "input_data"});
249 let config = ParseConfig::from_json(&input).unwrap();
250 assert_eq!(config.source, "payload");
251 assert_eq!(config.target, "input_data");
252 }
253
254 #[test]
255 fn test_parse_config_missing_source() {
256 assert!(ParseConfig::from_json(&json!({"target": "input_data"})).is_err());
257 }
258
259 #[test]
260 fn test_parse_config_missing_target() {
261 assert!(ParseConfig::from_json(&json!({"source": "payload"})).is_err());
262 }
263
264 #[test]
265 fn test_execute_parse_json_from_payload() {
266 let payload = json!({"name": "John", "age": 30});
267 let mut message = Message::from_value(&payload);
268
269 let config = ParseConfig {
270 source: "payload".to_string(),
271 target: "input".to_string(),
272 };
273
274 let result = execute_parse_json(&mut message, &config);
275 assert!(result.is_ok());
276
277 let (outcome, changes) = result.unwrap();
278 assert_eq!(outcome, TaskOutcome::Success);
279 assert_eq!(changes.len(), 1);
280 assert_eq!(changes[0].path.as_ref(), "data.input");
281
282 assert_eq!(message.data()["input"]["name"], dv(json!("John")));
283 assert_eq!(message.data()["input"]["age"], dv(json!(30)));
284 }
285
286 #[test]
287 fn test_execute_parse_json_from_nested_payload() {
288 let payload = json!({"body": {"user": {"name": "Alice"}}});
289 let mut message = Message::from_value(&payload);
290
291 let config = ParseConfig {
292 source: "payload.body.user".to_string(),
293 target: "user_data".to_string(),
294 };
295
296 let result = execute_parse_json(&mut message, &config);
297 assert!(result.is_ok());
298
299 let (outcome, _) = result.unwrap();
300 assert_eq!(outcome, TaskOutcome::Success);
301 assert_eq!(message.data()["user_data"]["name"], dv(json!("Alice")));
302 }
303
304 #[test]
305 fn test_execute_parse_json_from_data() {
306 let mut message = Message::new(Arc::new(dv(json!({}))));
307 set_nested_value(
308 &mut message.context,
309 "data",
310 dv(json!({"existing": {"value": 42}})),
311 );
312
313 let config = ParseConfig {
314 source: "data.existing".to_string(),
315 target: "copied".to_string(),
316 };
317
318 let result = execute_parse_json(&mut message, &config);
319 assert!(result.is_ok());
320
321 assert_eq!(message.data()["copied"]["value"], dv(json!(42)));
322 }
323
324 #[test]
325 fn test_execute_parse_xml_simple() {
326 let xml_payload = json!("<root><name>John</name><age>30</age></root>");
327 let mut message = Message::from_value(&xml_payload);
328
329 let config = ParseConfig {
330 source: "payload".to_string(),
331 target: "parsed".to_string(),
332 };
333
334 let result = execute_parse_xml(&mut message, &config);
335 assert!(result.is_ok());
336
337 let (outcome, _) = result.unwrap();
338 assert_eq!(outcome, TaskOutcome::Success);
339
340 let parsed = &message.data()["parsed"];
341 assert!(parsed.is_object());
342 }
343
344 #[test]
345 fn test_execute_parse_xml_not_string() {
346 let payload = json!({"not": "a string"});
347 let mut message = Message::from_value(&payload);
348
349 let config = ParseConfig {
350 source: "payload".to_string(),
351 target: "parsed".to_string(),
352 };
353
354 assert!(execute_parse_xml(&mut message, &config).is_err());
355 }
356
357 #[test]
358 fn test_xml_to_json_simple() {
359 let xml = "<root><name>Test</name></root>";
360 let result = xml_to_json(xml);
361 assert!(result.is_ok());
362 let json = result.unwrap();
363 assert!(json.is_object());
364 }
365
366 #[test]
367 fn test_xml_to_json_invalid() {
368 let xml = "<root><unclosed>";
369 assert!(xml_to_json(xml).is_err());
370 }
371
372 #[test]
373 fn test_xml_to_json_with_attributes() {
374 let xml = r#"<person id="123"><name>John</name></person>"#;
375 assert!(xml_to_json(xml).is_ok());
376 }
377
378 #[test]
379 fn test_xml_to_json_nested() {
380 let xml = r#"<root><user><name>Alice</name><email>alice@example.com</email></user></root>"#;
381 let result = xml_to_json(xml);
382 assert!(result.is_ok());
383 let json = result.unwrap();
384 assert!(json.is_object());
385 }
386
387 #[test]
388 fn test_execute_parse_json_from_string_payload() {
389 let payload = Value::String(r#"{"name":"John","age":30}"#.to_string());
390 let mut message = Message::from_value(&payload);
391
392 let config = ParseConfig {
393 source: "payload".to_string(),
394 target: "input".to_string(),
395 };
396
397 let result = execute_parse_json(&mut message, &config);
398 assert!(result.is_ok());
399
400 let (outcome, _) = result.unwrap();
401 assert_eq!(outcome, TaskOutcome::Success);
402
403 assert_eq!(message.data()["input"]["name"], dv(json!("John")));
404 assert_eq!(message.data()["input"]["age"], dv(json!(30)));
405 }
406}