dataflow_rs/engine/functions/
parse.rs1use crate::engine::error::{DataflowError, Result};
27use crate::engine::message::{Change, Message};
28use crate::engine::utils::get_nested_value;
29use log::debug;
30use serde::Deserialize;
31use serde_json::Value;
32use std::sync::Arc;
33
34#[derive(Debug, Clone, Deserialize)]
39pub struct ParseConfig {
40 pub source: String,
45
46 pub target: String,
49}
50
51impl ParseConfig {
52 pub fn from_json(input: &Value) -> Result<Self> {
60 let source = input
61 .get("source")
62 .and_then(Value::as_str)
63 .ok_or_else(|| {
64 DataflowError::Validation("Missing 'source' in parse config".to_string())
65 })?
66 .to_string();
67
68 let target = input
69 .get("target")
70 .and_then(Value::as_str)
71 .ok_or_else(|| {
72 DataflowError::Validation("Missing 'target' in parse config".to_string())
73 })?
74 .to_string();
75
76 Ok(ParseConfig { source, target })
77 }
78
79 fn extract_source(&self, message: &Message) -> Value {
87 if self.source == "payload" {
88 (*message.payload).clone()
89 } else if let Some(path) = self.source.strip_prefix("payload.") {
90 get_nested_value(&message.payload, path)
91 .cloned()
92 .unwrap_or(Value::Null)
93 } else if let Some(path) = self.source.strip_prefix("data.") {
94 get_nested_value(message.data(), path)
95 .cloned()
96 .unwrap_or(Value::Null)
97 } else {
98 get_nested_value(&message.context, &self.source)
100 .cloned()
101 .unwrap_or(Value::Null)
102 }
103 }
104}
105
106pub fn execute_parse_json(
119 message: &mut Message,
120 config: &ParseConfig,
121) -> Result<(usize, Vec<Change>)> {
122 debug!(
123 "ParseJson: Extracting from '{}' to 'data.{}'",
124 config.source, config.target
125 );
126
127 let source_data = config.extract_source(message);
129
130 let old_value = message
132 .data()
133 .get(&config.target)
134 .cloned()
135 .unwrap_or(Value::Null);
136
137 if let Some(data_obj) = message.data_mut().as_object_mut() {
139 data_obj.insert(config.target.clone(), source_data.clone());
140 }
141
142 message.invalidate_context_cache();
144
145 debug!(
146 "ParseJson: Successfully stored data to 'data.{}'",
147 config.target
148 );
149
150 Ok((
151 200,
152 vec![Change {
153 path: Arc::from(format!("data.{}", config.target)),
154 old_value: Arc::new(old_value),
155 new_value: Arc::new(source_data),
156 }],
157 ))
158}
159
160pub fn execute_parse_xml(
173 message: &mut Message,
174 config: &ParseConfig,
175) -> Result<(usize, Vec<Change>)> {
176 debug!(
177 "ParseXml: Extracting from '{}' to 'data.{}'",
178 config.source, config.target
179 );
180
181 let source_data = config.extract_source(message);
183
184 let xml_string = match &source_data {
186 Value::String(s) => s.clone(),
187 _ => {
188 return Err(DataflowError::Validation(format!(
189 "ParseXml: Source '{}' is not a string",
190 config.source
191 )));
192 }
193 };
194
195 let parsed_json = xml_to_json(&xml_string)?;
197
198 let old_value = message
200 .data()
201 .get(&config.target)
202 .cloned()
203 .unwrap_or(Value::Null);
204
205 if let Some(data_obj) = message.data_mut().as_object_mut() {
207 data_obj.insert(config.target.clone(), parsed_json.clone());
208 }
209
210 message.invalidate_context_cache();
212
213 debug!(
214 "ParseXml: Successfully parsed and stored XML to 'data.{}'",
215 config.target
216 );
217
218 Ok((
219 200,
220 vec![Change {
221 path: Arc::from(format!("data.{}", config.target)),
222 old_value: Arc::new(old_value),
223 new_value: Arc::new(parsed_json),
224 }],
225 ))
226}
227
228fn xml_to_json(xml: &str) -> Result<Value> {
237 use quick_xml::de::from_str;
238
239 let parsed: Value = from_str(xml)
241 .map_err(|e| DataflowError::Validation(format!("Failed to parse XML: {}", e)))?;
242
243 Ok(parsed)
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use serde_json::json;
250
251 #[test]
252 fn test_parse_config_from_json() {
253 let input = json!({
254 "source": "payload",
255 "target": "input_data"
256 });
257
258 let config = ParseConfig::from_json(&input).unwrap();
259 assert_eq!(config.source, "payload");
260 assert_eq!(config.target, "input_data");
261 }
262
263 #[test]
264 fn test_parse_config_missing_source() {
265 let input = json!({
266 "target": "input_data"
267 });
268
269 let result = ParseConfig::from_json(&input);
270 assert!(result.is_err());
271 }
272
273 #[test]
274 fn test_parse_config_missing_target() {
275 let input = json!({
276 "source": "payload"
277 });
278
279 let result = ParseConfig::from_json(&input);
280 assert!(result.is_err());
281 }
282
283 #[test]
284 fn test_execute_parse_json_from_payload() {
285 let payload = json!({
286 "name": "John",
287 "age": 30
288 });
289 let mut message = Message::new(Arc::new(payload));
290
291 let config = ParseConfig {
292 source: "payload".to_string(),
293 target: "input".to_string(),
294 };
295
296 let result = execute_parse_json(&mut message, &config);
297 assert!(result.is_ok());
298
299 let (status, changes) = result.unwrap();
300 assert_eq!(status, 200);
301 assert_eq!(changes.len(), 1);
302 assert_eq!(changes[0].path.as_ref(), "data.input");
303
304 assert_eq!(message.data()["input"]["name"], json!("John"));
306 assert_eq!(message.data()["input"]["age"], json!(30));
307 }
308
309 #[test]
310 fn test_execute_parse_json_from_nested_payload() {
311 let payload = json!({
312 "body": {
313 "user": {
314 "name": "Alice"
315 }
316 }
317 });
318 let mut message = Message::new(Arc::new(payload));
319
320 let config = ParseConfig {
321 source: "payload.body.user".to_string(),
322 target: "user_data".to_string(),
323 };
324
325 let result = execute_parse_json(&mut message, &config);
326 assert!(result.is_ok());
327
328 let (status, _) = result.unwrap();
329 assert_eq!(status, 200);
330
331 assert_eq!(message.data()["user_data"]["name"], json!("Alice"));
333 }
334
335 #[test]
336 fn test_execute_parse_json_from_data() {
337 let mut message = Message::new(Arc::new(json!({})));
338 message.context["data"] = json!({
339 "existing": {
340 "value": 42
341 }
342 });
343
344 let config = ParseConfig {
345 source: "data.existing".to_string(),
346 target: "copied".to_string(),
347 };
348
349 let result = execute_parse_json(&mut message, &config);
350 assert!(result.is_ok());
351
352 assert_eq!(message.data()["copied"]["value"], json!(42));
354 }
355
356 #[test]
357 fn test_execute_parse_xml_simple() {
358 let xml_payload = json!("<root><name>John</name><age>30</age></root>");
359 let mut message = Message::new(Arc::new(xml_payload));
360
361 let config = ParseConfig {
362 source: "payload".to_string(),
363 target: "parsed".to_string(),
364 };
365
366 let result = execute_parse_xml(&mut message, &config);
367 assert!(result.is_ok());
368
369 let (status, _) = result.unwrap();
370 assert_eq!(status, 200);
371
372 let parsed = &message.data()["parsed"];
374 assert!(parsed.is_object());
375 }
376
377 #[test]
378 fn test_execute_parse_xml_not_string() {
379 let payload = json!({"not": "a string"});
380 let mut message = Message::new(Arc::new(payload));
381
382 let config = ParseConfig {
383 source: "payload".to_string(),
384 target: "parsed".to_string(),
385 };
386
387 let result = execute_parse_xml(&mut message, &config);
388 assert!(result.is_err());
389 }
390
391 #[test]
392 fn test_xml_to_json_simple() {
393 let xml = "<root><name>Test</name></root>";
394 let result = xml_to_json(xml);
395 assert!(result.is_ok());
396
397 let json = result.unwrap();
398 assert!(json.is_object());
399 }
400
401 #[test]
402 fn test_xml_to_json_invalid() {
403 let xml = "<root><unclosed>";
405 let result = xml_to_json(xml);
406 assert!(result.is_err());
407 }
408
409 #[test]
410 fn test_xml_to_json_with_attributes() {
411 let xml = r#"<person id="123"><name>John</name></person>"#;
412 let result = xml_to_json(xml);
413 assert!(result.is_ok());
414 }
415
416 #[test]
417 fn test_xml_to_json_nested() {
418 let xml = r#"<root><user><name>Alice</name><email>alice@example.com</email></user></root>"#;
419 let result = xml_to_json(xml);
420 assert!(result.is_ok());
421
422 let json = result.unwrap();
423 assert!(json.is_object());
424 }
425}