Skip to main content

dataflow_rs/engine/functions/
parse.rs

1//! # Parse Function Module
2//!
3//! This module provides parsing capabilities for converting payload data into structured
4//! context data. It supports JSON and XML parsing, allowing workflows to start by loading
5//! payload into the context where it can be accessed by subsequent tasks.
6//!
7//! ## Features
8//!
9//! - Parse JSON payload into data field
10//! - Parse XML payload into JSON data field
11//! - Support for nested source paths (payload.body, data.field)
12//! - Automatic change tracking for audit trails
13//!
14//! ## Example Usage
15//!
16//! ```json
17//! {
18//!     "name": "parse_json",
19//!     "input": {
20//!         "source": "payload",
21//!         "target": "input_data"
22//!     }
23//! }
24//! ```
25
26use crate::engine::error::{DataflowError, Result};
27use crate::engine::message::{Change, Message};
28use crate::engine::utils::get_nested_value;
29use log::debug;
30use serde::Deserialize;
31use serde_json::Value;
32use std::sync::Arc;
33
34/// Configuration for parse functions.
35///
36/// Specifies where to read the source data from and where to store
37/// the parsed result in the data context.
38#[derive(Debug, Clone, Deserialize)]
39pub struct ParseConfig {
40    /// Source path to read from.
41    /// - "payload" - Read the entire payload
42    /// - "payload.field" - Read a nested field from payload
43    /// - "data.field" - Read from existing data context
44    pub source: String,
45
46    /// Target field name in data where the parsed result will be stored.
47    /// The result is stored at `data.{target}`.
48    pub target: String,
49}
50
51impl ParseConfig {
52    /// Parses a `ParseConfig` from a JSON value.
53    ///
54    /// # Arguments
55    /// * `input` - JSON object containing "source" and "target" fields
56    ///
57    /// # Errors
58    /// Returns `DataflowError::Validation` if required fields are missing
59    pub fn from_json(input: &Value) -> Result<Self> {
60        let source = input
61            .get("source")
62            .and_then(Value::as_str)
63            .ok_or_else(|| {
64                DataflowError::Validation("Missing 'source' in parse config".to_string())
65            })?
66            .to_string();
67
68        let target = input
69            .get("target")
70            .and_then(Value::as_str)
71            .ok_or_else(|| {
72                DataflowError::Validation("Missing 'target' in parse config".to_string())
73            })?
74            .to_string();
75
76        Ok(ParseConfig { source, target })
77    }
78
79    /// Extract source data based on the source path configuration.
80    ///
81    /// # Arguments
82    /// * `message` - The message to extract data from
83    ///
84    /// # Returns
85    /// The extracted value, or Value::Null if not found
86    fn extract_source(&self, message: &Message) -> Value {
87        if self.source == "payload" {
88            (*message.payload).clone()
89        } else if let Some(path) = self.source.strip_prefix("payload.") {
90            get_nested_value(&message.payload, path)
91                .cloned()
92                .unwrap_or(Value::Null)
93        } else if let Some(path) = self.source.strip_prefix("data.") {
94            get_nested_value(message.data(), path)
95                .cloned()
96                .unwrap_or(Value::Null)
97        } else {
98            // Try to get from context directly
99            get_nested_value(&message.context, &self.source)
100                .cloned()
101                .unwrap_or(Value::Null)
102        }
103    }
104}
105
106/// Execute parse_json operation.
107///
108/// Extracts JSON data from the source path and stores it in the target data field.
109/// This is typically used at the start of a workflow to load payload into context.
110///
111/// # Arguments
112/// * `message` - The message to process (modified in place)
113/// * `config` - Parse configuration specifying source and target
114///
115/// # Returns
116/// * `Ok((200, changes))` - Success with list of changes for audit trail
117/// * `Err` - If configuration is invalid
118pub fn execute_parse_json(
119    message: &mut Message,
120    config: &ParseConfig,
121) -> Result<(usize, Vec<Change>)> {
122    debug!(
123        "ParseJson: Extracting from '{}' to 'data.{}'",
124        config.source, config.target
125    );
126
127    // Extract source data
128    let source_data = config.extract_source(message);
129
130    // Get old value for change tracking
131    let old_value = message
132        .data()
133        .get(&config.target)
134        .cloned()
135        .unwrap_or(Value::Null);
136
137    // Store to target in data
138    if let Some(data_obj) = message.data_mut().as_object_mut() {
139        data_obj.insert(config.target.clone(), source_data.clone());
140    }
141
142    // Invalidate context cache
143    message.invalidate_context_cache();
144
145    debug!(
146        "ParseJson: Successfully stored data to 'data.{}'",
147        config.target
148    );
149
150    Ok((
151        200,
152        vec![Change {
153            path: Arc::from(format!("data.{}", config.target)),
154            old_value: Arc::new(old_value),
155            new_value: Arc::new(source_data),
156        }],
157    ))
158}
159
160/// Execute parse_xml operation.
161///
162/// Extracts XML string from the source path, parses it to JSON, and stores
163/// it in the target data field.
164///
165/// # Arguments
166/// * `message` - The message to process (modified in place)
167/// * `config` - Parse configuration specifying source and target
168///
169/// # Returns
170/// * `Ok((200, changes))` - Success with list of changes for audit trail
171/// * `Err` - If configuration is invalid or XML parsing fails
172pub fn execute_parse_xml(
173    message: &mut Message,
174    config: &ParseConfig,
175) -> Result<(usize, Vec<Change>)> {
176    debug!(
177        "ParseXml: Extracting from '{}' to 'data.{}'",
178        config.source, config.target
179    );
180
181    // Extract source data
182    let source_data = config.extract_source(message);
183
184    // Get XML string
185    let xml_string = match &source_data {
186        Value::String(s) => s.clone(),
187        _ => {
188            return Err(DataflowError::Validation(format!(
189                "ParseXml: Source '{}' is not a string",
190                config.source
191            )));
192        }
193    };
194
195    // Parse XML to JSON
196    let parsed_json = xml_to_json(&xml_string)?;
197
198    // Get old value for change tracking
199    let old_value = message
200        .data()
201        .get(&config.target)
202        .cloned()
203        .unwrap_or(Value::Null);
204
205    // Store to target in data
206    if let Some(data_obj) = message.data_mut().as_object_mut() {
207        data_obj.insert(config.target.clone(), parsed_json.clone());
208    }
209
210    // Invalidate context cache
211    message.invalidate_context_cache();
212
213    debug!(
214        "ParseXml: Successfully parsed and stored XML to 'data.{}'",
215        config.target
216    );
217
218    Ok((
219        200,
220        vec![Change {
221            path: Arc::from(format!("data.{}", config.target)),
222            old_value: Arc::new(old_value),
223            new_value: Arc::new(parsed_json),
224        }],
225    ))
226}
227
228/// Convert XML string to JSON Value.
229///
230/// Uses quick-xml with serde for conversion. The resulting JSON structure
231/// follows the convention where:
232/// - Element names become object keys
233/// - Text content is stored under "$text" key
234/// - Attributes are stored under "$attr" key
235/// - Multiple child elements with same name become arrays
236fn xml_to_json(xml: &str) -> Result<Value> {
237    use quick_xml::de::from_str;
238
239    // Parse XML to JSON using quick-xml's serde support
240    let parsed: Value = from_str(xml)
241        .map_err(|e| DataflowError::Validation(format!("Failed to parse XML: {}", e)))?;
242
243    Ok(parsed)
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use serde_json::json;
250
251    #[test]
252    fn test_parse_config_from_json() {
253        let input = json!({
254            "source": "payload",
255            "target": "input_data"
256        });
257
258        let config = ParseConfig::from_json(&input).unwrap();
259        assert_eq!(config.source, "payload");
260        assert_eq!(config.target, "input_data");
261    }
262
263    #[test]
264    fn test_parse_config_missing_source() {
265        let input = json!({
266            "target": "input_data"
267        });
268
269        let result = ParseConfig::from_json(&input);
270        assert!(result.is_err());
271    }
272
273    #[test]
274    fn test_parse_config_missing_target() {
275        let input = json!({
276            "source": "payload"
277        });
278
279        let result = ParseConfig::from_json(&input);
280        assert!(result.is_err());
281    }
282
283    #[test]
284    fn test_execute_parse_json_from_payload() {
285        let payload = json!({
286            "name": "John",
287            "age": 30
288        });
289        let mut message = Message::new(Arc::new(payload));
290
291        let config = ParseConfig {
292            source: "payload".to_string(),
293            target: "input".to_string(),
294        };
295
296        let result = execute_parse_json(&mut message, &config);
297        assert!(result.is_ok());
298
299        let (status, changes) = result.unwrap();
300        assert_eq!(status, 200);
301        assert_eq!(changes.len(), 1);
302        assert_eq!(changes[0].path.as_ref(), "data.input");
303
304        // Verify data was stored
305        assert_eq!(message.data()["input"]["name"], json!("John"));
306        assert_eq!(message.data()["input"]["age"], json!(30));
307    }
308
309    #[test]
310    fn test_execute_parse_json_from_nested_payload() {
311        let payload = json!({
312            "body": {
313                "user": {
314                    "name": "Alice"
315                }
316            }
317        });
318        let mut message = Message::new(Arc::new(payload));
319
320        let config = ParseConfig {
321            source: "payload.body.user".to_string(),
322            target: "user_data".to_string(),
323        };
324
325        let result = execute_parse_json(&mut message, &config);
326        assert!(result.is_ok());
327
328        let (status, _) = result.unwrap();
329        assert_eq!(status, 200);
330
331        // Verify nested data was extracted
332        assert_eq!(message.data()["user_data"]["name"], json!("Alice"));
333    }
334
335    #[test]
336    fn test_execute_parse_json_from_data() {
337        let mut message = Message::new(Arc::new(json!({})));
338        message.context["data"] = json!({
339            "existing": {
340                "value": 42
341            }
342        });
343
344        let config = ParseConfig {
345            source: "data.existing".to_string(),
346            target: "copied".to_string(),
347        };
348
349        let result = execute_parse_json(&mut message, &config);
350        assert!(result.is_ok());
351
352        // Verify data was copied
353        assert_eq!(message.data()["copied"]["value"], json!(42));
354    }
355
356    #[test]
357    fn test_execute_parse_xml_simple() {
358        let xml_payload = json!("<root><name>John</name><age>30</age></root>");
359        let mut message = Message::new(Arc::new(xml_payload));
360
361        let config = ParseConfig {
362            source: "payload".to_string(),
363            target: "parsed".to_string(),
364        };
365
366        let result = execute_parse_xml(&mut message, &config);
367        assert!(result.is_ok());
368
369        let (status, _) = result.unwrap();
370        assert_eq!(status, 200);
371
372        // Verify XML was parsed
373        let parsed = &message.data()["parsed"];
374        assert!(parsed.is_object());
375    }
376
377    #[test]
378    fn test_execute_parse_xml_not_string() {
379        let payload = json!({"not": "a string"});
380        let mut message = Message::new(Arc::new(payload));
381
382        let config = ParseConfig {
383            source: "payload".to_string(),
384            target: "parsed".to_string(),
385        };
386
387        let result = execute_parse_xml(&mut message, &config);
388        assert!(result.is_err());
389    }
390
391    #[test]
392    fn test_xml_to_json_simple() {
393        let xml = "<root><name>Test</name></root>";
394        let result = xml_to_json(xml);
395        assert!(result.is_ok());
396
397        let json = result.unwrap();
398        assert!(json.is_object());
399    }
400
401    #[test]
402    fn test_xml_to_json_invalid() {
403        // Test with unclosed tag
404        let xml = "<root><unclosed>";
405        let result = xml_to_json(xml);
406        assert!(result.is_err());
407    }
408
409    #[test]
410    fn test_xml_to_json_with_attributes() {
411        let xml = r#"<person id="123"><name>John</name></person>"#;
412        let result = xml_to_json(xml);
413        assert!(result.is_ok());
414    }
415
416    #[test]
417    fn test_xml_to_json_nested() {
418        let xml = r#"<root><user><name>Alice</name><email>alice@example.com</email></user></root>"#;
419        let result = xml_to_json(xml);
420        assert!(result.is_ok());
421
422        let json = result.unwrap();
423        assert!(json.is_object());
424    }
425}