Skip to main content

dataflow_rs/engine/functions/
parse.rs

1//! # Parse Function Module
2//!
3//! This module provides parsing capabilities for converting payload data into structured
4//! context data. It supports JSON and XML parsing, allowing workflows to start by loading
5//! payload into the context where it can be accessed by subsequent tasks.
6//!
7//! ## Features
8//!
9//! - Parse JSON payload into data field
10//! - Parse XML payload into JSON data field
11//! - Support for nested source paths (payload.body, data.field)
12//! - Automatic change tracking for audit trails
13//!
14//! ## Example Usage
15//!
16//! ```json
17//! {
18//!     "name": "parse_json",
19//!     "input": {
20//!         "source": "payload",
21//!         "target": "input_data"
22//!     }
23//! }
24//! ```
25
26use crate::engine::error::{DataflowError, Result};
27use crate::engine::message::{Change, Message};
28use crate::engine::utils::get_nested_value;
29use log::debug;
30use serde::Deserialize;
31use serde_json::Value;
32use std::sync::Arc;
33
34/// Configuration for parse functions.
35///
36/// Specifies where to read the source data from and where to store
37/// the parsed result in the data context.
38#[derive(Debug, Clone, Deserialize)]
39pub struct ParseConfig {
40    /// Source path to read from.
41    /// - "payload" - Read the entire payload
42    /// - "payload.field" - Read a nested field from payload
43    /// - "data.field" - Read from existing data context
44    pub source: String,
45
46    /// Target field name in data where the parsed result will be stored.
47    /// The result is stored at `data.{target}`.
48    pub target: String,
49}
50
51impl ParseConfig {
52    /// Parses a `ParseConfig` from a JSON value.
53    ///
54    /// # Arguments
55    /// * `input` - JSON object containing "source" and "target" fields
56    ///
57    /// # Errors
58    /// Returns `DataflowError::Validation` if required fields are missing
59    pub fn from_json(input: &Value) -> Result<Self> {
60        let source = input
61            .get("source")
62            .and_then(Value::as_str)
63            .ok_or_else(|| {
64                DataflowError::Validation("Missing 'source' in parse config".to_string())
65            })?
66            .to_string();
67
68        let target = input
69            .get("target")
70            .and_then(Value::as_str)
71            .ok_or_else(|| {
72                DataflowError::Validation("Missing 'target' in parse config".to_string())
73            })?
74            .to_string();
75
76        Ok(ParseConfig { source, target })
77    }
78
79    /// Extract source data based on the source path configuration.
80    ///
81    /// # Arguments
82    /// * `message` - The message to extract data from
83    ///
84    /// # Returns
85    /// The extracted value, or Value::Null if not found
86    fn extract_source(&self, message: &Message) -> Value {
87        if self.source == "payload" {
88            (*message.payload).clone()
89        } else if let Some(path) = self.source.strip_prefix("payload.") {
90            get_nested_value(&message.payload, path)
91                .cloned()
92                .unwrap_or(Value::Null)
93        } else if let Some(path) = self.source.strip_prefix("data.") {
94            get_nested_value(message.data(), path)
95                .cloned()
96                .unwrap_or(Value::Null)
97        } else {
98            // Try to get from context directly
99            get_nested_value(&message.context, &self.source)
100                .cloned()
101                .unwrap_or(Value::Null)
102        }
103    }
104}
105
106/// Execute parse_json operation.
107///
108/// Extracts JSON data from the source path and stores it in the target data field.
109/// This is typically used at the start of a workflow to load payload into context.
110///
111/// # Arguments
112/// * `message` - The message to process (modified in place)
113/// * `config` - Parse configuration specifying source and target
114///
115/// # Returns
116/// * `Ok((200, changes))` - Success with list of changes for audit trail
117/// * `Err` - If configuration is invalid
118pub fn execute_parse_json(
119    message: &mut Message,
120    config: &ParseConfig,
121) -> Result<(usize, Vec<Change>)> {
122    debug!(
123        "ParseJson: Extracting from '{}' to 'data.{}'",
124        config.source, config.target
125    );
126
127    // Extract source data
128    let source_data = config.extract_source(message);
129
130    // If source is a JSON string, parse it into a structured value
131    let source_data = match &source_data {
132        Value::String(s) => serde_json::from_str(s).unwrap_or(source_data),
133        _ => source_data,
134    };
135
136    // Get old value for change tracking
137    let old_value = message
138        .data()
139        .get(&config.target)
140        .cloned()
141        .unwrap_or(Value::Null);
142
143    // Store to target in data
144    if let Some(data_obj) = message.data_mut().as_object_mut() {
145        data_obj.insert(config.target.clone(), source_data.clone());
146    }
147
148    // Invalidate context cache
149    message.invalidate_context_cache();
150
151    debug!(
152        "ParseJson: Successfully stored data to 'data.{}'",
153        config.target
154    );
155
156    Ok((
157        200,
158        vec![Change {
159            path: Arc::from(format!("data.{}", config.target)),
160            old_value: Arc::new(old_value),
161            new_value: Arc::new(source_data),
162        }],
163    ))
164}
165
166/// Execute parse_xml operation.
167///
168/// Extracts XML string from the source path, parses it to JSON, and stores
169/// it in the target data field.
170///
171/// # Arguments
172/// * `message` - The message to process (modified in place)
173/// * `config` - Parse configuration specifying source and target
174///
175/// # Returns
176/// * `Ok((200, changes))` - Success with list of changes for audit trail
177/// * `Err` - If configuration is invalid or XML parsing fails
178pub fn execute_parse_xml(
179    message: &mut Message,
180    config: &ParseConfig,
181) -> Result<(usize, Vec<Change>)> {
182    debug!(
183        "ParseXml: Extracting from '{}' to 'data.{}'",
184        config.source, config.target
185    );
186
187    // Extract source data
188    let source_data = config.extract_source(message);
189
190    // Get XML string
191    let xml_string = match &source_data {
192        Value::String(s) => s.clone(),
193        _ => {
194            return Err(DataflowError::Validation(format!(
195                "ParseXml: Source '{}' is not a string",
196                config.source
197            )));
198        }
199    };
200
201    // Parse XML to JSON
202    let parsed_json = xml_to_json(&xml_string)?;
203
204    // Get old value for change tracking
205    let old_value = message
206        .data()
207        .get(&config.target)
208        .cloned()
209        .unwrap_or(Value::Null);
210
211    // Store to target in data
212    if let Some(data_obj) = message.data_mut().as_object_mut() {
213        data_obj.insert(config.target.clone(), parsed_json.clone());
214    }
215
216    // Invalidate context cache
217    message.invalidate_context_cache();
218
219    debug!(
220        "ParseXml: Successfully parsed and stored XML to 'data.{}'",
221        config.target
222    );
223
224    Ok((
225        200,
226        vec![Change {
227            path: Arc::from(format!("data.{}", config.target)),
228            old_value: Arc::new(old_value),
229            new_value: Arc::new(parsed_json),
230        }],
231    ))
232}
233
234/// Convert XML string to JSON Value.
235///
236/// Uses quick-xml with serde for conversion. The resulting JSON structure
237/// follows the convention where:
238/// - Element names become object keys
239/// - Text content is stored under "$text" key
240/// - Attributes are stored under "$attr" key
241/// - Multiple child elements with same name become arrays
242fn xml_to_json(xml: &str) -> Result<Value> {
243    use quick_xml::de::from_str;
244
245    // Parse XML to JSON using quick-xml's serde support
246    let parsed: Value = from_str(xml)
247        .map_err(|e| DataflowError::Validation(format!("Failed to parse XML: {}", e)))?;
248
249    Ok(parsed)
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use serde_json::json;
256
257    #[test]
258    fn test_parse_config_from_json() {
259        let input = json!({
260            "source": "payload",
261            "target": "input_data"
262        });
263
264        let config = ParseConfig::from_json(&input).unwrap();
265        assert_eq!(config.source, "payload");
266        assert_eq!(config.target, "input_data");
267    }
268
269    #[test]
270    fn test_parse_config_missing_source() {
271        let input = json!({
272            "target": "input_data"
273        });
274
275        let result = ParseConfig::from_json(&input);
276        assert!(result.is_err());
277    }
278
279    #[test]
280    fn test_parse_config_missing_target() {
281        let input = json!({
282            "source": "payload"
283        });
284
285        let result = ParseConfig::from_json(&input);
286        assert!(result.is_err());
287    }
288
289    #[test]
290    fn test_execute_parse_json_from_payload() {
291        let payload = json!({
292            "name": "John",
293            "age": 30
294        });
295        let mut message = Message::new(Arc::new(payload));
296
297        let config = ParseConfig {
298            source: "payload".to_string(),
299            target: "input".to_string(),
300        };
301
302        let result = execute_parse_json(&mut message, &config);
303        assert!(result.is_ok());
304
305        let (status, changes) = result.unwrap();
306        assert_eq!(status, 200);
307        assert_eq!(changes.len(), 1);
308        assert_eq!(changes[0].path.as_ref(), "data.input");
309
310        // Verify data was stored
311        assert_eq!(message.data()["input"]["name"], json!("John"));
312        assert_eq!(message.data()["input"]["age"], json!(30));
313    }
314
315    #[test]
316    fn test_execute_parse_json_from_nested_payload() {
317        let payload = json!({
318            "body": {
319                "user": {
320                    "name": "Alice"
321                }
322            }
323        });
324        let mut message = Message::new(Arc::new(payload));
325
326        let config = ParseConfig {
327            source: "payload.body.user".to_string(),
328            target: "user_data".to_string(),
329        };
330
331        let result = execute_parse_json(&mut message, &config);
332        assert!(result.is_ok());
333
334        let (status, _) = result.unwrap();
335        assert_eq!(status, 200);
336
337        // Verify nested data was extracted
338        assert_eq!(message.data()["user_data"]["name"], json!("Alice"));
339    }
340
341    #[test]
342    fn test_execute_parse_json_from_data() {
343        let mut message = Message::new(Arc::new(json!({})));
344        message.context["data"] = json!({
345            "existing": {
346                "value": 42
347            }
348        });
349
350        let config = ParseConfig {
351            source: "data.existing".to_string(),
352            target: "copied".to_string(),
353        };
354
355        let result = execute_parse_json(&mut message, &config);
356        assert!(result.is_ok());
357
358        // Verify data was copied
359        assert_eq!(message.data()["copied"]["value"], json!(42));
360    }
361
362    #[test]
363    fn test_execute_parse_xml_simple() {
364        let xml_payload = json!("<root><name>John</name><age>30</age></root>");
365        let mut message = Message::new(Arc::new(xml_payload));
366
367        let config = ParseConfig {
368            source: "payload".to_string(),
369            target: "parsed".to_string(),
370        };
371
372        let result = execute_parse_xml(&mut message, &config);
373        assert!(result.is_ok());
374
375        let (status, _) = result.unwrap();
376        assert_eq!(status, 200);
377
378        // Verify XML was parsed
379        let parsed = &message.data()["parsed"];
380        assert!(parsed.is_object());
381    }
382
383    #[test]
384    fn test_execute_parse_xml_not_string() {
385        let payload = json!({"not": "a string"});
386        let mut message = Message::new(Arc::new(payload));
387
388        let config = ParseConfig {
389            source: "payload".to_string(),
390            target: "parsed".to_string(),
391        };
392
393        let result = execute_parse_xml(&mut message, &config);
394        assert!(result.is_err());
395    }
396
397    #[test]
398    fn test_xml_to_json_simple() {
399        let xml = "<root><name>Test</name></root>";
400        let result = xml_to_json(xml);
401        assert!(result.is_ok());
402
403        let json = result.unwrap();
404        assert!(json.is_object());
405    }
406
407    #[test]
408    fn test_xml_to_json_invalid() {
409        // Test with unclosed tag
410        let xml = "<root><unclosed>";
411        let result = xml_to_json(xml);
412        assert!(result.is_err());
413    }
414
415    #[test]
416    fn test_xml_to_json_with_attributes() {
417        let xml = r#"<person id="123"><name>John</name></person>"#;
418        let result = xml_to_json(xml);
419        assert!(result.is_ok());
420    }
421
422    #[test]
423    fn test_xml_to_json_nested() {
424        let xml = r#"<root><user><name>Alice</name><email>alice@example.com</email></user></root>"#;
425        let result = xml_to_json(xml);
426        assert!(result.is_ok());
427
428        let json = result.unwrap();
429        assert!(json.is_object());
430    }
431
432    #[test]
433    fn test_execute_parse_json_from_string_payload() {
434        // Simulate WASM layer storing payload as a raw JSON string
435        let payload = Value::String(r#"{"name":"John","age":30}"#.to_string());
436        let mut message = Message::new(Arc::new(payload));
437
438        let config = ParseConfig {
439            source: "payload".to_string(),
440            target: "input".to_string(),
441        };
442
443        let result = execute_parse_json(&mut message, &config);
444        assert!(result.is_ok());
445
446        let (status, _) = result.unwrap();
447        assert_eq!(status, 200);
448
449        // Verify the JSON string was parsed into a structured value
450        assert_eq!(message.data()["input"]["name"], json!("John"));
451        assert_eq!(message.data()["input"]["age"], json!(30));
452    }
453}