dataflow_rs/engine/functions/
parse.rs1use crate::engine::error::{DataflowError, Result};
27use crate::engine::message::{Change, Message};
28use crate::engine::utils::get_nested_value;
29use log::debug;
30use serde::Deserialize;
31use serde_json::Value;
32use std::sync::Arc;
33
34#[derive(Debug, Clone, Deserialize)]
39pub struct ParseConfig {
40 pub source: String,
45
46 pub target: String,
49}
50
51impl ParseConfig {
52 pub fn from_json(input: &Value) -> Result<Self> {
60 let source = input
61 .get("source")
62 .and_then(Value::as_str)
63 .ok_or_else(|| {
64 DataflowError::Validation("Missing 'source' in parse config".to_string())
65 })?
66 .to_string();
67
68 let target = input
69 .get("target")
70 .and_then(Value::as_str)
71 .ok_or_else(|| {
72 DataflowError::Validation("Missing 'target' in parse config".to_string())
73 })?
74 .to_string();
75
76 Ok(ParseConfig { source, target })
77 }
78
79 fn extract_source(&self, message: &Message) -> Value {
87 if self.source == "payload" {
88 (*message.payload).clone()
89 } else if let Some(path) = self.source.strip_prefix("payload.") {
90 get_nested_value(&message.payload, path)
91 .cloned()
92 .unwrap_or(Value::Null)
93 } else if let Some(path) = self.source.strip_prefix("data.") {
94 get_nested_value(message.data(), path)
95 .cloned()
96 .unwrap_or(Value::Null)
97 } else {
98 get_nested_value(&message.context, &self.source)
100 .cloned()
101 .unwrap_or(Value::Null)
102 }
103 }
104}
105
106pub fn execute_parse_json(
119 message: &mut Message,
120 config: &ParseConfig,
121) -> Result<(usize, Vec<Change>)> {
122 debug!(
123 "ParseJson: Extracting from '{}' to 'data.{}'",
124 config.source, config.target
125 );
126
127 let source_data = config.extract_source(message);
129
130 let source_data = match &source_data {
132 Value::String(s) => serde_json::from_str(s).unwrap_or(source_data),
133 _ => source_data,
134 };
135
136 let old_value = message
138 .data()
139 .get(&config.target)
140 .cloned()
141 .unwrap_or(Value::Null);
142
143 if let Some(data_obj) = message.data_mut().as_object_mut() {
145 data_obj.insert(config.target.clone(), source_data.clone());
146 }
147
148 message.invalidate_context_cache();
150
151 debug!(
152 "ParseJson: Successfully stored data to 'data.{}'",
153 config.target
154 );
155
156 Ok((
157 200,
158 vec![Change {
159 path: Arc::from(format!("data.{}", config.target)),
160 old_value: Arc::new(old_value),
161 new_value: Arc::new(source_data),
162 }],
163 ))
164}
165
166pub fn execute_parse_xml(
179 message: &mut Message,
180 config: &ParseConfig,
181) -> Result<(usize, Vec<Change>)> {
182 debug!(
183 "ParseXml: Extracting from '{}' to 'data.{}'",
184 config.source, config.target
185 );
186
187 let source_data = config.extract_source(message);
189
190 let xml_string = match &source_data {
192 Value::String(s) => s.clone(),
193 _ => {
194 return Err(DataflowError::Validation(format!(
195 "ParseXml: Source '{}' is not a string",
196 config.source
197 )));
198 }
199 };
200
201 let parsed_json = xml_to_json(&xml_string)?;
203
204 let old_value = message
206 .data()
207 .get(&config.target)
208 .cloned()
209 .unwrap_or(Value::Null);
210
211 if let Some(data_obj) = message.data_mut().as_object_mut() {
213 data_obj.insert(config.target.clone(), parsed_json.clone());
214 }
215
216 message.invalidate_context_cache();
218
219 debug!(
220 "ParseXml: Successfully parsed and stored XML to 'data.{}'",
221 config.target
222 );
223
224 Ok((
225 200,
226 vec![Change {
227 path: Arc::from(format!("data.{}", config.target)),
228 old_value: Arc::new(old_value),
229 new_value: Arc::new(parsed_json),
230 }],
231 ))
232}
233
234fn xml_to_json(xml: &str) -> Result<Value> {
243 use quick_xml::de::from_str;
244
245 let parsed: Value = from_str(xml)
247 .map_err(|e| DataflowError::Validation(format!("Failed to parse XML: {}", e)))?;
248
249 Ok(parsed)
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use serde_json::json;
256
257 #[test]
258 fn test_parse_config_from_json() {
259 let input = json!({
260 "source": "payload",
261 "target": "input_data"
262 });
263
264 let config = ParseConfig::from_json(&input).unwrap();
265 assert_eq!(config.source, "payload");
266 assert_eq!(config.target, "input_data");
267 }
268
269 #[test]
270 fn test_parse_config_missing_source() {
271 let input = json!({
272 "target": "input_data"
273 });
274
275 let result = ParseConfig::from_json(&input);
276 assert!(result.is_err());
277 }
278
279 #[test]
280 fn test_parse_config_missing_target() {
281 let input = json!({
282 "source": "payload"
283 });
284
285 let result = ParseConfig::from_json(&input);
286 assert!(result.is_err());
287 }
288
289 #[test]
290 fn test_execute_parse_json_from_payload() {
291 let payload = json!({
292 "name": "John",
293 "age": 30
294 });
295 let mut message = Message::new(Arc::new(payload));
296
297 let config = ParseConfig {
298 source: "payload".to_string(),
299 target: "input".to_string(),
300 };
301
302 let result = execute_parse_json(&mut message, &config);
303 assert!(result.is_ok());
304
305 let (status, changes) = result.unwrap();
306 assert_eq!(status, 200);
307 assert_eq!(changes.len(), 1);
308 assert_eq!(changes[0].path.as_ref(), "data.input");
309
310 assert_eq!(message.data()["input"]["name"], json!("John"));
312 assert_eq!(message.data()["input"]["age"], json!(30));
313 }
314
315 #[test]
316 fn test_execute_parse_json_from_nested_payload() {
317 let payload = json!({
318 "body": {
319 "user": {
320 "name": "Alice"
321 }
322 }
323 });
324 let mut message = Message::new(Arc::new(payload));
325
326 let config = ParseConfig {
327 source: "payload.body.user".to_string(),
328 target: "user_data".to_string(),
329 };
330
331 let result = execute_parse_json(&mut message, &config);
332 assert!(result.is_ok());
333
334 let (status, _) = result.unwrap();
335 assert_eq!(status, 200);
336
337 assert_eq!(message.data()["user_data"]["name"], json!("Alice"));
339 }
340
341 #[test]
342 fn test_execute_parse_json_from_data() {
343 let mut message = Message::new(Arc::new(json!({})));
344 message.context["data"] = json!({
345 "existing": {
346 "value": 42
347 }
348 });
349
350 let config = ParseConfig {
351 source: "data.existing".to_string(),
352 target: "copied".to_string(),
353 };
354
355 let result = execute_parse_json(&mut message, &config);
356 assert!(result.is_ok());
357
358 assert_eq!(message.data()["copied"]["value"], json!(42));
360 }
361
362 #[test]
363 fn test_execute_parse_xml_simple() {
364 let xml_payload = json!("<root><name>John</name><age>30</age></root>");
365 let mut message = Message::new(Arc::new(xml_payload));
366
367 let config = ParseConfig {
368 source: "payload".to_string(),
369 target: "parsed".to_string(),
370 };
371
372 let result = execute_parse_xml(&mut message, &config);
373 assert!(result.is_ok());
374
375 let (status, _) = result.unwrap();
376 assert_eq!(status, 200);
377
378 let parsed = &message.data()["parsed"];
380 assert!(parsed.is_object());
381 }
382
383 #[test]
384 fn test_execute_parse_xml_not_string() {
385 let payload = json!({"not": "a string"});
386 let mut message = Message::new(Arc::new(payload));
387
388 let config = ParseConfig {
389 source: "payload".to_string(),
390 target: "parsed".to_string(),
391 };
392
393 let result = execute_parse_xml(&mut message, &config);
394 assert!(result.is_err());
395 }
396
397 #[test]
398 fn test_xml_to_json_simple() {
399 let xml = "<root><name>Test</name></root>";
400 let result = xml_to_json(xml);
401 assert!(result.is_ok());
402
403 let json = result.unwrap();
404 assert!(json.is_object());
405 }
406
407 #[test]
408 fn test_xml_to_json_invalid() {
409 let xml = "<root><unclosed>";
411 let result = xml_to_json(xml);
412 assert!(result.is_err());
413 }
414
415 #[test]
416 fn test_xml_to_json_with_attributes() {
417 let xml = r#"<person id="123"><name>John</name></person>"#;
418 let result = xml_to_json(xml);
419 assert!(result.is_ok());
420 }
421
422 #[test]
423 fn test_xml_to_json_nested() {
424 let xml = r#"<root><user><name>Alice</name><email>alice@example.com</email></user></root>"#;
425 let result = xml_to_json(xml);
426 assert!(result.is_ok());
427
428 let json = result.unwrap();
429 assert!(json.is_object());
430 }
431
432 #[test]
433 fn test_execute_parse_json_from_string_payload() {
434 let payload = Value::String(r#"{"name":"John","age":30}"#.to_string());
436 let mut message = Message::new(Arc::new(payload));
437
438 let config = ParseConfig {
439 source: "payload".to_string(),
440 target: "input".to_string(),
441 };
442
443 let result = execute_parse_json(&mut message, &config);
444 assert!(result.is_ok());
445
446 let (status, _) = result.unwrap();
447 assert_eq!(status, 200);
448
449 assert_eq!(message.data()["input"]["name"], json!("John"));
451 assert_eq!(message.data()["input"]["age"], json!(30));
452 }
453}