Skip to main content

dataflow_rs/engine/functions/
publish.rs

1//! # Publish Function Module
2//!
3//! Serialises a slice of the message's `data` context to a JSON or XML string
4//! and stores it back under `data.{target}`. JSON uses `OwnedDataValue`'s
5//! native `to_json_string`; pretty-printed JSON and XML both bridge through
6//! `serde_json::Value` since neither is on the hot path.
7
8use crate::engine::error::{DataflowError, Result};
9use crate::engine::message::{Change, Message};
10use crate::engine::task_outcome::TaskOutcome;
11use crate::engine::utils::{get_nested_value, set_nested_value};
12use datavalue::OwnedDataValue;
13use log::debug;
14use serde::Deserialize;
15use serde_json::Value;
16use std::sync::Arc;
17
18/// Configuration for publish functions.
19#[derive(Debug, Clone, Deserialize)]
20pub struct PublishConfig {
21    /// Source field path inside `data` to serialize.
22    pub source: String,
23
24    /// Target field name inside `data` to receive the serialised string.
25    pub target: String,
26
27    /// Whether to pretty-print the output (JSON only).
28    #[serde(default)]
29    pub pretty: bool,
30
31    /// Root element name for XML output.
32    #[serde(default = "default_root_element")]
33    pub root_element: String,
34}
35
36fn default_root_element() -> String {
37    "root".to_string()
38}
39
40impl PublishConfig {
41    pub fn from_json(input: &Value) -> Result<Self> {
42        let source = input
43            .get("source")
44            .and_then(Value::as_str)
45            .ok_or_else(|| {
46                DataflowError::Validation("Missing 'source' in publish config".to_string())
47            })?
48            .to_string();
49
50        let target = input
51            .get("target")
52            .and_then(Value::as_str)
53            .ok_or_else(|| {
54                DataflowError::Validation("Missing 'target' in publish config".to_string())
55            })?
56            .to_string();
57
58        let pretty = input
59            .get("pretty")
60            .and_then(Value::as_bool)
61            .unwrap_or(false);
62
63        let root_element = input
64            .get("root_element")
65            .and_then(Value::as_str)
66            .map(String::from)
67            .unwrap_or_else(default_root_element);
68
69        Ok(PublishConfig {
70            source,
71            target,
72            pretty,
73            root_element,
74        })
75    }
76
77    /// Extract the source value as an owned `OwnedDataValue`.
78    fn extract_source(&self, message: &Message) -> OwnedDataValue {
79        // Direct field in `data`.
80        if let Some(value) = message.data().get(&self.source) {
81            return value.clone();
82        }
83
84        // Nested path inside `data`.
85        if let Some(value) = get_nested_value(message.data(), &self.source) {
86            return value.clone();
87        }
88
89        // `data.<path>` shorthand pointing back into `data`.
90        if let Some(path) = self.source.strip_prefix("data.")
91            && let Some(value) = get_nested_value(message.data(), path)
92        {
93            return value.clone();
94        }
95
96        OwnedDataValue::Null
97    }
98}
99
100/// Execute `publish_json`: serialise `data.{source}` to a JSON string and
101/// store at `data.{target}`.
102pub fn execute_publish_json(
103    message: &mut Message,
104    config: &PublishConfig,
105) -> Result<(TaskOutcome, Vec<Change>)> {
106    debug!(
107        "PublishJson: Serializing 'data.{}' to 'data.{}'",
108        config.source, config.target
109    );
110
111    let source_data = config.extract_source(message);
112
113    if matches!(source_data, OwnedDataValue::Null) {
114        return Err(DataflowError::Validation(format!(
115            "PublishJson: Source 'data.{}' not found or is null",
116            config.source
117        )));
118    }
119
120    // For compact JSON, use OwnedDataValue's native emitter (fastest path).
121    // For pretty JSON, bridge to serde_json::Value — pretty publish is not a
122    // hot path and the bridge cost there is irrelevant.
123    let json_string = if config.pretty {
124        let bridge = Value::from(&source_data);
125        serde_json::to_string_pretty(&bridge)
126            .map_err(|e| DataflowError::Validation(format!("Failed to serialize to JSON: {}", e)))?
127    } else {
128        source_data.to_json_string()
129    };
130
131    let target_path = format!("data.{}", config.target);
132    let old_value = get_nested_value(&message.context, &target_path)
133        .cloned()
134        .unwrap_or(OwnedDataValue::Null);
135    let new_value = OwnedDataValue::String(json_string);
136
137    set_nested_value(&mut message.context, &target_path, new_value.clone());
138
139    Ok((
140        TaskOutcome::Success,
141        vec![Change {
142            path: Arc::from(target_path),
143            old_value,
144            new_value,
145        }],
146    ))
147}
148
149/// Execute `publish_xml`: serialise `data.{source}` to an XML string and
150/// store at `data.{target}`. Bridges to `serde_json::Value` for the existing
151/// recursive XML walker — XML is the slow path, no perf concern.
152pub fn execute_publish_xml(
153    message: &mut Message,
154    config: &PublishConfig,
155) -> Result<(TaskOutcome, Vec<Change>)> {
156    debug!(
157        "PublishXml: Serializing 'data.{}' to 'data.{}'",
158        config.source, config.target
159    );
160
161    let source_data = config.extract_source(message);
162
163    if matches!(source_data, OwnedDataValue::Null) {
164        return Err(DataflowError::Validation(format!(
165            "PublishXml: Source 'data.{}' not found or is null",
166            config.source
167        )));
168    }
169
170    let bridge = Value::from(&source_data);
171    let xml_string = json_to_xml(&bridge, &config.root_element)?;
172
173    let target_path = format!("data.{}", config.target);
174    let old_value = get_nested_value(&message.context, &target_path)
175        .cloned()
176        .unwrap_or(OwnedDataValue::Null);
177    let new_value = OwnedDataValue::String(xml_string);
178
179    set_nested_value(&mut message.context, &target_path, new_value.clone());
180
181    Ok((
182        TaskOutcome::Success,
183        vec![Change {
184            path: Arc::from(target_path),
185            old_value,
186            new_value,
187        }],
188    ))
189}
190
191/// Convert JSON Value to XML string. Recursive walker; same shape as before
192/// the OwnedDataValue refactor — kept on `serde_json::Value` since XML is the
193/// slow path.
194fn json_to_xml(value: &Value, root_element: &str) -> Result<String> {
195    let mut buffer = String::new();
196
197    match value {
198        Value::Object(_) => {
199            buffer.push_str(&format!("<{}>", root_element));
200            let content = serialize_value_to_xml_content(value)?;
201            buffer.push_str(&content);
202            buffer.push_str(&format!("</{}>", root_element));
203        }
204        Value::Array(arr) => {
205            buffer.push_str(&format!("<{}>", root_element));
206            for item in arr {
207                buffer.push_str("<item>");
208                let content = serialize_value_to_xml_content(item)?;
209                buffer.push_str(&content);
210                buffer.push_str("</item>");
211            }
212            buffer.push_str(&format!("</{}>", root_element));
213        }
214        _ => {
215            buffer.push_str(&format!("<{}>", root_element));
216            buffer.push_str(&value_to_xml_string(value));
217            buffer.push_str(&format!("</{}>", root_element));
218        }
219    }
220
221    Ok(buffer)
222}
223
224fn serialize_value_to_xml_content(value: &Value) -> Result<String> {
225    let mut result = String::new();
226
227    match value {
228        Value::Object(map) => {
229            for (key, val) in map {
230                let safe_key = sanitize_xml_name(key);
231                result.push_str(&format!("<{}>", safe_key));
232                match val {
233                    Value::Object(_) | Value::Array(_) => {
234                        result.push_str(&serialize_value_to_xml_content(val)?);
235                    }
236                    _ => {
237                        result.push_str(&value_to_xml_string(val));
238                    }
239                }
240                result.push_str(&format!("</{}>", safe_key));
241            }
242        }
243        Value::Array(arr) => {
244            for item in arr {
245                result.push_str("<item>");
246                match item {
247                    Value::Object(_) | Value::Array(_) => {
248                        result.push_str(&serialize_value_to_xml_content(item)?);
249                    }
250                    _ => {
251                        result.push_str(&value_to_xml_string(item));
252                    }
253                }
254                result.push_str("</item>");
255            }
256        }
257        _ => {
258            result.push_str(&value_to_xml_string(value));
259        }
260    }
261
262    Ok(result)
263}
264
265fn value_to_xml_string(value: &Value) -> String {
266    match value {
267        Value::Null => String::new(),
268        Value::Bool(b) => b.to_string(),
269        Value::Number(n) => n.to_string(),
270        Value::String(s) => escape_xml(s),
271        _ => String::new(),
272    }
273}
274
275fn escape_xml(s: &str) -> String {
276    s.replace('&', "&amp;")
277        .replace('<', "&lt;")
278        .replace('>', "&gt;")
279        .replace('"', "&quot;")
280        .replace('\'', "&apos;")
281}
282
283fn sanitize_xml_name(name: &str) -> String {
284    let mut result = String::new();
285
286    for (i, c) in name.chars().enumerate() {
287        if i == 0 {
288            if c.is_ascii_alphabetic() || c == '_' {
289                result.push(c);
290            } else {
291                result.push('_');
292                if c.is_ascii_alphanumeric() {
293                    result.push(c);
294                }
295            }
296        } else if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' {
297            result.push(c);
298        } else {
299            result.push('_');
300        }
301    }
302
303    if result.is_empty() {
304        result = "_element".to_string();
305    }
306
307    result
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use serde_json::json;
314
315    fn dv(v: serde_json::Value) -> OwnedDataValue {
316        OwnedDataValue::from(&v)
317    }
318
319    fn message_with_data(initial: serde_json::Value) -> Message {
320        let mut m = Message::new(Arc::new(dv(json!({}))));
321        set_nested_value(&mut m.context, "data", dv(initial));
322        m
323    }
324
325    #[test]
326    fn test_publish_config_from_json() {
327        let input = json!({"source": "output", "target": "json_string"});
328        let config = PublishConfig::from_json(&input).unwrap();
329        assert_eq!(config.source, "output");
330        assert_eq!(config.target, "json_string");
331        assert!(!config.pretty);
332        assert_eq!(config.root_element, "root");
333    }
334
335    #[test]
336    fn test_publish_config_with_options() {
337        let input = json!({
338            "source": "data",
339            "target": "xml_output",
340            "pretty": true,
341            "root_element": "document"
342        });
343
344        let config = PublishConfig::from_json(&input).unwrap();
345        assert_eq!(config.source, "data");
346        assert_eq!(config.target, "xml_output");
347        assert!(config.pretty);
348        assert_eq!(config.root_element, "document");
349    }
350
351    #[test]
352    fn test_publish_config_missing_source() {
353        assert!(PublishConfig::from_json(&json!({"target": "output"})).is_err());
354    }
355
356    #[test]
357    fn test_publish_config_missing_target() {
358        assert!(PublishConfig::from_json(&json!({"source": "input"})).is_err());
359    }
360
361    #[test]
362    fn test_execute_publish_json() {
363        let mut message = message_with_data(json!({"user": {"name": "John", "age": 30}}));
364
365        let config = PublishConfig {
366            source: "user".to_string(),
367            target: "user_json".to_string(),
368            pretty: false,
369            root_element: "root".to_string(),
370        };
371
372        let result = execute_publish_json(&mut message, &config);
373        assert!(result.is_ok());
374
375        let (outcome, changes) = result.unwrap();
376        assert_eq!(outcome, TaskOutcome::Success);
377        assert_eq!(changes.len(), 1);
378
379        let json_string = message.data()["user_json"].as_str().unwrap();
380        assert!(json_string.contains("John"));
381        assert!(json_string.contains("30"));
382    }
383
384    #[test]
385    fn test_execute_publish_json_pretty() {
386        let mut message = message_with_data(json!({"user": {"name": "Alice"}}));
387
388        let config = PublishConfig {
389            source: "user".to_string(),
390            target: "output".to_string(),
391            pretty: true,
392            root_element: "root".to_string(),
393        };
394
395        let result = execute_publish_json(&mut message, &config);
396        assert!(result.is_ok());
397
398        let json_string = message.data()["output"].as_str().unwrap();
399        assert!(json_string.contains('\n'));
400    }
401
402    #[test]
403    fn test_execute_publish_json_not_found() {
404        let mut message = Message::new(Arc::new(dv(json!({}))));
405
406        let config = PublishConfig {
407            source: "nonexistent".to_string(),
408            target: "output".to_string(),
409            pretty: false,
410            root_element: "root".to_string(),
411        };
412
413        assert!(execute_publish_json(&mut message, &config).is_err());
414    }
415
416    #[test]
417    fn test_execute_publish_xml() {
418        let mut message = message_with_data(json!({"user": {"name": "John", "age": 30}}));
419
420        let config = PublishConfig {
421            source: "user".to_string(),
422            target: "user_xml".to_string(),
423            pretty: false,
424            root_element: "user".to_string(),
425        };
426
427        let result = execute_publish_xml(&mut message, &config);
428        assert!(result.is_ok());
429
430        let (outcome, _) = result.unwrap();
431        assert_eq!(outcome, TaskOutcome::Success);
432
433        let xml_string = message.data()["user_xml"].as_str().unwrap();
434        assert!(xml_string.contains("<user>"));
435        assert!(xml_string.contains("</user>"));
436        assert!(xml_string.contains("<name>John</name>"));
437    }
438
439    #[test]
440    fn test_execute_publish_xml_not_found() {
441        let mut message = Message::new(Arc::new(dv(json!({}))));
442
443        let config = PublishConfig {
444            source: "nonexistent".to_string(),
445            target: "output".to_string(),
446            pretty: false,
447            root_element: "root".to_string(),
448        };
449
450        assert!(execute_publish_xml(&mut message, &config).is_err());
451    }
452
453    #[test]
454    fn test_json_to_xml_simple() {
455        let value = json!({"name": "Test", "value": 42});
456        let xml = json_to_xml(&value, "root").unwrap();
457        assert!(xml.contains("<root>"));
458        assert!(xml.contains("</root>"));
459        assert!(xml.contains("<name>Test</name>"));
460        assert!(xml.contains("<value>42</value>"));
461    }
462
463    #[test]
464    fn test_json_to_xml_nested() {
465        let value = json!({"user": {"name": "Alice", "email": "alice@example.com"}});
466        let xml = json_to_xml(&value, "data").unwrap();
467        assert!(xml.contains("<data>"));
468        assert!(xml.contains("<user>"));
469        assert!(xml.contains("<name>Alice</name>"));
470    }
471
472    #[test]
473    fn test_json_to_xml_array() {
474        let value = json!([1, 2, 3]);
475        let xml = json_to_xml(&value, "numbers").unwrap();
476        assert!(xml.contains("<numbers>"));
477        assert!(xml.contains("<item>1</item>"));
478        assert!(xml.contains("<item>2</item>"));
479        assert!(xml.contains("<item>3</item>"));
480    }
481
482    #[test]
483    fn test_json_to_xml_special_chars() {
484        let value = json!({"text": "<script>alert('xss')</script>"});
485        let xml = json_to_xml(&value, "root").unwrap();
486        assert!(xml.contains("&lt;script&gt;"));
487        assert!(!xml.contains("<script>"));
488    }
489
490    #[test]
491    fn test_escape_xml() {
492        assert_eq!(escape_xml("hello"), "hello");
493        assert_eq!(escape_xml("<tag>"), "&lt;tag&gt;");
494        assert_eq!(escape_xml("a & b"), "a &amp; b");
495        assert_eq!(escape_xml("\"quoted\""), "&quot;quoted&quot;");
496    }
497
498    #[test]
499    fn test_sanitize_xml_name() {
500        assert_eq!(sanitize_xml_name("valid"), "valid");
501        assert_eq!(sanitize_xml_name("_valid"), "_valid");
502        assert_eq!(sanitize_xml_name("123invalid"), "_123invalid");
503        assert_eq!(sanitize_xml_name("has spaces"), "has_spaces");
504        assert_eq!(sanitize_xml_name("has-dash"), "has-dash");
505        assert_eq!(sanitize_xml_name(""), "_element");
506    }
507
508    #[test]
509    fn test_execute_publish_json_nested_source() {
510        let mut message = message_with_data(json!({
511            "response": {"body": {"message": "success"}}
512        }));
513
514        let config = PublishConfig {
515            source: "response.body".to_string(),
516            target: "output".to_string(),
517            pretty: false,
518            root_element: "root".to_string(),
519        };
520
521        let result = execute_publish_json(&mut message, &config);
522        assert!(result.is_ok());
523
524        let json_string = message.data()["output"].as_str().unwrap();
525        assert!(json_string.contains("success"));
526    }
527}