Skip to main content

dataflow_rs/engine/functions/
parse.rs

1//! # Parse Function Module
2//!
3//! Parsing helpers that load payload data into the message's `data` context.
4//! Supports JSON (native) and XML (via `serde_json::Value` bridge — XML is the
5//! slow path, not worth a dedicated walker).
6//!
7//! Source paths:
8//! - `"payload"` — entire payload
9//! - `"payload.<path>"` — a nested field of the payload
10//! - `"data.<path>"` — a nested field of the existing data context
11//! - `"<path>"` — anything else is resolved against the full context
12
13use crate::engine::error::{DataflowError, Result};
14use crate::engine::executor::ArenaContext;
15use crate::engine::message::{Change, Message};
16use crate::engine::task_outcome::TaskOutcome;
17use crate::engine::utils::{get_nested_value, set_nested_value};
18use datavalue::OwnedDataValue;
19use log::debug;
20use serde::Deserialize;
21use serde_json::Value;
22use std::sync::Arc;
23
24/// Configuration for parse functions.
25#[derive(Debug, Clone, Deserialize)]
26pub struct ParseConfig {
27    /// Source path to read from.
28    pub source: String,
29
30    /// Target field name in `data` (stored at `data.{target}`).
31    pub target: String,
32}
33
34impl ParseConfig {
35    pub fn from_json(input: &Value) -> Result<Self> {
36        let source = input
37            .get("source")
38            .and_then(Value::as_str)
39            .ok_or_else(|| {
40                DataflowError::Validation("Missing 'source' in parse config".to_string())
41            })?
42            .to_string();
43
44        let target = input
45            .get("target")
46            .and_then(Value::as_str)
47            .ok_or_else(|| {
48                DataflowError::Validation("Missing 'target' in parse config".to_string())
49            })?
50            .to_string();
51
52        Ok(ParseConfig { source, target })
53    }
54
55    /// Extract the source value as an owned `OwnedDataValue`.
56    fn extract_source(&self, message: &Message) -> OwnedDataValue {
57        if self.source == "payload" {
58            (*message.payload).clone()
59        } else if let Some(path) = self.source.strip_prefix("payload.") {
60            get_nested_value(&message.payload, path)
61                .cloned()
62                .unwrap_or(OwnedDataValue::Null)
63        } else if let Some(path) = self.source.strip_prefix("data.") {
64            get_nested_value(message.data(), path)
65                .cloned()
66                .unwrap_or(OwnedDataValue::Null)
67        } else {
68            get_nested_value(&message.context, &self.source)
69                .cloned()
70                .unwrap_or(OwnedDataValue::Null)
71        }
72    }
73}
74
75/// Execute `parse_json`: read the source value and store it under `data.{target}`.
76/// If the source is a JSON string, attempt to parse it; on failure, store the
77/// string as-is (matches prior behaviour).
78pub fn execute_parse_json(
79    message: &mut Message,
80    config: &ParseConfig,
81) -> Result<(TaskOutcome, Vec<Change>)> {
82    debug!(
83        "ParseJson: Extracting from '{}' to 'data.{}'",
84        config.source, config.target
85    );
86
87    let target_path = format!("data.{}", config.target);
88
89    // Hot path: source == "payload" and not a JSON-string payload. The
90    // payload Arc is already on the message; clone-into-context once, reuse
91    // the Arc for the audit entry (refcount bump). This is the realistic
92    // benchmark's exact shape.
93    let payload_fast_path =
94        config.source == "payload" && !matches!(*message.payload, OwnedDataValue::String(_));
95
96    if message.capture_changes {
97        let old_value = get_nested_value(&message.context, &target_path)
98            .cloned()
99            .unwrap_or(OwnedDataValue::Null);
100
101        // Resolve the source value once. For the payload fast-path we clone
102        // out of the shared `Arc<OwnedDataValue>` payload; for the slow path
103        // we extract from a sub-tree and re-parse JSON-string payloads.
104        let source_data = if payload_fast_path {
105            (*message.payload).clone()
106        } else {
107            let raw = config.extract_source(message);
108            match &raw {
109                OwnedDataValue::String(s) => {
110                    OwnedDataValue::from_json(s).unwrap_or_else(|_| raw.clone())
111                }
112                _ => raw,
113            }
114        };
115
116        // Clone the source value once for the audit `new_value`; the original
117        // is moved into the context below. (No `Arc` wrapping in the audit
118        // entry — `Change` owns its values directly.)
119        let new_value = source_data.clone();
120
121        set_nested_value(&mut message.context, &target_path, source_data);
122        debug!(
123            "ParseJson: Successfully stored data to 'data.{}'",
124            config.target
125        );
126        return Ok((
127            TaskOutcome::Success,
128            vec![Change {
129                path: Arc::from(target_path),
130                old_value,
131                new_value,
132            }],
133        ));
134    }
135
136    // Audit-off fast path: only the deep clone into the context survives.
137    let source_data_for_context: OwnedDataValue = if payload_fast_path {
138        (*message.payload).clone()
139    } else {
140        let raw = config.extract_source(message);
141        match &raw {
142            OwnedDataValue::String(s) => {
143                OwnedDataValue::from_json(s).unwrap_or_else(|_| raw.clone())
144            }
145            _ => raw,
146        }
147    };
148    set_nested_value(&mut message.context, &target_path, source_data_for_context);
149
150    debug!(
151        "ParseJson: Successfully stored data to 'data.{}'",
152        config.target
153    );
154
155    Ok((TaskOutcome::Success, Vec::new()))
156}
157
158/// Same as `execute_parse_json` but also refreshes the supplied
159/// `ArenaContext` so subsequent sync tasks in the same workflow stretch see
160/// the written `data.<target>` slot without rebuilding the whole arena form.
161pub(crate) fn execute_parse_json_in_arena(
162    message: &mut Message,
163    config: &ParseConfig,
164    arena_ctx: &mut ArenaContext<'_>,
165) -> Result<(TaskOutcome, Vec<Change>)> {
166    // Resolve the write target before calling execute_parse_json so we can
167    // refresh the arena slot afterwards using the same path.
168    let target_path = format!("data.{}", config.target);
169    let result = execute_parse_json(message, config)?;
170    // Refresh ONLY the affected depth-2 slot in the arena cache. For
171    // source == "payload" target = "input", this is `data.input` — the
172    // heavy slot — but it's re-arena'd exactly once per workflow stretch
173    // here, not once per subsequent map mapping.
174    arena_ctx.refresh_for_path(&message.context, &target_path);
175    Ok(result)
176}
177
178/// Execute `parse_xml`: read the source string, parse XML into a
179/// `serde_json::Value` (existing quick-xml path), convert to `OwnedDataValue`,
180/// store under `data.{target}`.
181pub fn execute_parse_xml(
182    message: &mut Message,
183    config: &ParseConfig,
184) -> Result<(TaskOutcome, Vec<Change>)> {
185    debug!(
186        "ParseXml: Extracting from '{}' to 'data.{}'",
187        config.source, config.target
188    );
189
190    let source_data = config.extract_source(message);
191
192    let xml_string = match &source_data {
193        OwnedDataValue::String(s) => s.clone(),
194        _ => {
195            return Err(DataflowError::Validation(format!(
196                "ParseXml: Source '{}' is not a string",
197                config.source
198            )));
199        }
200    };
201
202    let parsed_json = xml_to_json(&xml_string)?;
203    let parsed_owned = OwnedDataValue::from(&parsed_json);
204
205    let target_path = format!("data.{}", config.target);
206    let old_value = get_nested_value(&message.context, &target_path)
207        .cloned()
208        .unwrap_or(OwnedDataValue::Null);
209
210    set_nested_value(&mut message.context, &target_path, parsed_owned.clone());
211
212    debug!(
213        "ParseXml: Successfully parsed and stored XML to 'data.{}'",
214        config.target
215    );
216
217    Ok((
218        TaskOutcome::Success,
219        vec![Change {
220            path: Arc::from(target_path),
221            old_value,
222            new_value: parsed_owned,
223        }],
224    ))
225}
226
227/// Convert an XML string to `serde_json::Value` using quick-xml's serde path.
228fn xml_to_json(xml: &str) -> Result<Value> {
229    use quick_xml::de::from_str;
230
231    let parsed: Value = from_str(xml)
232        .map_err(|e| DataflowError::Validation(format!("Failed to parse XML: {}", e)))?;
233
234    Ok(parsed)
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use serde_json::json;
241
242    fn dv(v: serde_json::Value) -> OwnedDataValue {
243        OwnedDataValue::from(&v)
244    }
245
246    #[test]
247    fn test_parse_config_from_json() {
248        let input = json!({"source": "payload", "target": "input_data"});
249        let config = ParseConfig::from_json(&input).unwrap();
250        assert_eq!(config.source, "payload");
251        assert_eq!(config.target, "input_data");
252    }
253
254    #[test]
255    fn test_parse_config_missing_source() {
256        assert!(ParseConfig::from_json(&json!({"target": "input_data"})).is_err());
257    }
258
259    #[test]
260    fn test_parse_config_missing_target() {
261        assert!(ParseConfig::from_json(&json!({"source": "payload"})).is_err());
262    }
263
264    #[test]
265    fn test_execute_parse_json_from_payload() {
266        let payload = json!({"name": "John", "age": 30});
267        let mut message = Message::from_value(&payload);
268
269        let config = ParseConfig {
270            source: "payload".to_string(),
271            target: "input".to_string(),
272        };
273
274        let result = execute_parse_json(&mut message, &config);
275        assert!(result.is_ok());
276
277        let (outcome, changes) = result.unwrap();
278        assert_eq!(outcome, TaskOutcome::Success);
279        assert_eq!(changes.len(), 1);
280        assert_eq!(changes[0].path.as_ref(), "data.input");
281
282        assert_eq!(message.data()["input"]["name"], dv(json!("John")));
283        assert_eq!(message.data()["input"]["age"], dv(json!(30)));
284    }
285
286    #[test]
287    fn test_execute_parse_json_from_nested_payload() {
288        let payload = json!({"body": {"user": {"name": "Alice"}}});
289        let mut message = Message::from_value(&payload);
290
291        let config = ParseConfig {
292            source: "payload.body.user".to_string(),
293            target: "user_data".to_string(),
294        };
295
296        let result = execute_parse_json(&mut message, &config);
297        assert!(result.is_ok());
298
299        let (outcome, _) = result.unwrap();
300        assert_eq!(outcome, TaskOutcome::Success);
301        assert_eq!(message.data()["user_data"]["name"], dv(json!("Alice")));
302    }
303
304    #[test]
305    fn test_execute_parse_json_from_data() {
306        let mut message = Message::new(Arc::new(dv(json!({}))));
307        set_nested_value(
308            &mut message.context,
309            "data",
310            dv(json!({"existing": {"value": 42}})),
311        );
312
313        let config = ParseConfig {
314            source: "data.existing".to_string(),
315            target: "copied".to_string(),
316        };
317
318        let result = execute_parse_json(&mut message, &config);
319        assert!(result.is_ok());
320
321        assert_eq!(message.data()["copied"]["value"], dv(json!(42)));
322    }
323
324    #[test]
325    fn test_execute_parse_xml_simple() {
326        let xml_payload = json!("<root><name>John</name><age>30</age></root>");
327        let mut message = Message::from_value(&xml_payload);
328
329        let config = ParseConfig {
330            source: "payload".to_string(),
331            target: "parsed".to_string(),
332        };
333
334        let result = execute_parse_xml(&mut message, &config);
335        assert!(result.is_ok());
336
337        let (outcome, _) = result.unwrap();
338        assert_eq!(outcome, TaskOutcome::Success);
339
340        let parsed = &message.data()["parsed"];
341        assert!(parsed.is_object());
342    }
343
344    #[test]
345    fn test_execute_parse_xml_not_string() {
346        let payload = json!({"not": "a string"});
347        let mut message = Message::from_value(&payload);
348
349        let config = ParseConfig {
350            source: "payload".to_string(),
351            target: "parsed".to_string(),
352        };
353
354        assert!(execute_parse_xml(&mut message, &config).is_err());
355    }
356
357    #[test]
358    fn test_xml_to_json_simple() {
359        let xml = "<root><name>Test</name></root>";
360        let result = xml_to_json(xml);
361        assert!(result.is_ok());
362        let json = result.unwrap();
363        assert!(json.is_object());
364    }
365
366    #[test]
367    fn test_xml_to_json_invalid() {
368        let xml = "<root><unclosed>";
369        assert!(xml_to_json(xml).is_err());
370    }
371
372    #[test]
373    fn test_xml_to_json_with_attributes() {
374        let xml = r#"<person id="123"><name>John</name></person>"#;
375        assert!(xml_to_json(xml).is_ok());
376    }
377
378    #[test]
379    fn test_xml_to_json_nested() {
380        let xml = r#"<root><user><name>Alice</name><email>alice@example.com</email></user></root>"#;
381        let result = xml_to_json(xml);
382        assert!(result.is_ok());
383        let json = result.unwrap();
384        assert!(json.is_object());
385    }
386
387    #[test]
388    fn test_execute_parse_json_from_string_payload() {
389        let payload = Value::String(r#"{"name":"John","age":30}"#.to_string());
390        let mut message = Message::from_value(&payload);
391
392        let config = ParseConfig {
393            source: "payload".to_string(),
394            target: "input".to_string(),
395        };
396
397        let result = execute_parse_json(&mut message, &config);
398        assert!(result.is_ok());
399
400        let (outcome, _) = result.unwrap();
401        assert_eq!(outcome, TaskOutcome::Success);
402
403        assert_eq!(message.data()["input"]["name"], dv(json!("John")));
404        assert_eq!(message.data()["input"]["age"], dv(json!(30)));
405    }
406}