Skip to main content

dataflow_rs/engine/functions/
publish.rs

1//! # Publish Function Module
2//!
3//! This module provides publishing capabilities for converting structured context data
4//! into serialized string formats (JSON or XML). It's typically used at the end of a
5//! workflow to prepare output data for transmission or storage.
6//!
7//! ## Features
8//!
9//! - Serialize data field to JSON string
10//! - Serialize data field to XML string
11//! - Support for nested source paths
12//! - Automatic change tracking for audit trails
13//!
14//! ## Example Usage
15//!
16//! ```json
17//! {
18//!     "name": "publish_json",
19//!     "input": {
20//!         "source": "output",
21//!         "target": "json_string"
22//!     }
23//! }
24//! ```
25
26use crate::engine::error::{DataflowError, Result};
27use crate::engine::message::{Change, Message};
28use crate::engine::utils::get_nested_value;
29use log::debug;
30use serde::Deserialize;
31use serde_json::Value;
32use std::sync::Arc;
33
34/// Configuration for publish functions.
35///
36/// Specifies where to read the source data from and where to store
37/// the serialized result in the data context.
38#[derive(Debug, Clone, Deserialize)]
39pub struct PublishConfig {
40    /// Source field name in data to serialize.
41    /// - "field_name" - Read from data.field_name
42    /// - "nested.field" - Read from data.nested.field
43    pub source: String,
44
45    /// Target field name in data where the serialized string will be stored.
46    /// The result is stored at `data.{target}`.
47    pub target: String,
48
49    /// Whether to pretty-print the output (for JSON only).
50    /// Defaults to false.
51    #[serde(default)]
52    pub pretty: bool,
53
54    /// Root element name for XML output.
55    /// Defaults to "root".
56    #[serde(default = "default_root_element")]
57    pub root_element: String,
58}
59
60fn default_root_element() -> String {
61    "root".to_string()
62}
63
64impl PublishConfig {
65    /// Parses a `PublishConfig` from a JSON value.
66    ///
67    /// # Arguments
68    /// * `input` - JSON object containing "source" and "target" fields
69    ///
70    /// # Errors
71    /// Returns `DataflowError::Validation` if required fields are missing
72    pub fn from_json(input: &Value) -> Result<Self> {
73        let source = input
74            .get("source")
75            .and_then(Value::as_str)
76            .ok_or_else(|| {
77                DataflowError::Validation("Missing 'source' in publish config".to_string())
78            })?
79            .to_string();
80
81        let target = input
82            .get("target")
83            .and_then(Value::as_str)
84            .ok_or_else(|| {
85                DataflowError::Validation("Missing 'target' in publish config".to_string())
86            })?
87            .to_string();
88
89        let pretty = input
90            .get("pretty")
91            .and_then(Value::as_bool)
92            .unwrap_or(false);
93
94        let root_element = input
95            .get("root_element")
96            .and_then(Value::as_str)
97            .map(String::from)
98            .unwrap_or_else(default_root_element);
99
100        Ok(PublishConfig {
101            source,
102            target,
103            pretty,
104            root_element,
105        })
106    }
107
108    /// Extract source data from the data context.
109    ///
110    /// # Arguments
111    /// * `message` - The message to extract data from
112    ///
113    /// # Returns
114    /// The extracted value, or Value::Null if not found
115    fn extract_source(&self, message: &Message) -> Value {
116        // Check if source is a direct field in data
117        if let Some(value) = message.data().get(&self.source) {
118            return value.clone();
119        }
120
121        // Try nested path in data
122        if let Some(value) = get_nested_value(message.data(), &self.source) {
123            return value.clone();
124        }
125
126        // Try full context path (for data.field syntax)
127        if let Some(path) = self.source.strip_prefix("data.")
128            && let Some(value) = get_nested_value(message.data(), path)
129        {
130            return value.clone();
131        }
132
133        Value::Null
134    }
135}
136
137/// Execute publish_json operation.
138///
139/// Serializes data from the source path to a JSON string and stores it
140/// in the target data field.
141///
142/// # Arguments
143/// * `message` - The message to process (modified in place)
144/// * `config` - Publish configuration specifying source and target
145///
146/// # Returns
147/// * `Ok((200, changes))` - Success with list of changes for audit trail
148/// * `Err` - If configuration is invalid or serialization fails
149pub fn execute_publish_json(
150    message: &mut Message,
151    config: &PublishConfig,
152) -> Result<(usize, Vec<Change>)> {
153    debug!(
154        "PublishJson: Serializing 'data.{}' to 'data.{}'",
155        config.source, config.target
156    );
157
158    // Extract source data
159    let source_data = config.extract_source(message);
160
161    if source_data.is_null() {
162        return Err(DataflowError::Validation(format!(
163            "PublishJson: Source 'data.{}' not found or is null",
164            config.source
165        )));
166    }
167
168    // Serialize to JSON string
169    let json_string = if config.pretty {
170        serde_json::to_string_pretty(&source_data)
171    } else {
172        serde_json::to_string(&source_data)
173    }
174    .map_err(|e| DataflowError::Validation(format!("Failed to serialize to JSON: {}", e)))?;
175
176    // Get old value for change tracking
177    let old_value = message
178        .data()
179        .get(&config.target)
180        .cloned()
181        .unwrap_or(Value::Null);
182
183    let new_value = Value::String(json_string);
184
185    // Store to target in data
186    if let Some(data_obj) = message.data_mut().as_object_mut() {
187        data_obj.insert(config.target.clone(), new_value.clone());
188    }
189
190    // Invalidate context cache
191    message.invalidate_context_cache();
192
193    debug!(
194        "PublishJson: Successfully serialized to 'data.{}'",
195        config.target
196    );
197
198    Ok((
199        200,
200        vec![Change {
201            path: Arc::from(format!("data.{}", config.target)),
202            old_value: Arc::new(old_value),
203            new_value: Arc::new(new_value),
204        }],
205    ))
206}
207
208/// Execute publish_xml operation.
209///
210/// Serializes data from the source path to an XML string and stores it
211/// in the target data field.
212///
213/// # Arguments
214/// * `message` - The message to process (modified in place)
215/// * `config` - Publish configuration specifying source and target
216///
217/// # Returns
218/// * `Ok((200, changes))` - Success with list of changes for audit trail
219/// * `Err` - If configuration is invalid or serialization fails
220pub fn execute_publish_xml(
221    message: &mut Message,
222    config: &PublishConfig,
223) -> Result<(usize, Vec<Change>)> {
224    debug!(
225        "PublishXml: Serializing 'data.{}' to 'data.{}'",
226        config.source, config.target
227    );
228
229    // Extract source data
230    let source_data = config.extract_source(message);
231
232    if source_data.is_null() {
233        return Err(DataflowError::Validation(format!(
234            "PublishXml: Source 'data.{}' not found or is null",
235            config.source
236        )));
237    }
238
239    // Serialize to XML string
240    let xml_string = json_to_xml(&source_data, &config.root_element)?;
241
242    // Get old value for change tracking
243    let old_value = message
244        .data()
245        .get(&config.target)
246        .cloned()
247        .unwrap_or(Value::Null);
248
249    let new_value = Value::String(xml_string);
250
251    // Store to target in data
252    if let Some(data_obj) = message.data_mut().as_object_mut() {
253        data_obj.insert(config.target.clone(), new_value.clone());
254    }
255
256    // Invalidate context cache
257    message.invalidate_context_cache();
258
259    debug!(
260        "PublishXml: Successfully serialized to 'data.{}'",
261        config.target
262    );
263
264    Ok((
265        200,
266        vec![Change {
267            path: Arc::from(format!("data.{}", config.target)),
268            old_value: Arc::new(old_value),
269            new_value: Arc::new(new_value),
270        }],
271    ))
272}
273
274/// Convert JSON Value to XML string.
275///
276/// Uses a recursive approach to convert JSON to XML.
277fn json_to_xml(value: &Value, root_element: &str) -> Result<String> {
278    let mut buffer = String::new();
279
280    // For objects, serialize directly with root element
281    match value {
282        Value::Object(_) => {
283            // Create XML with custom root element
284            buffer.push_str(&format!("<{}>", root_element));
285
286            // Serialize the object contents
287            let content = serialize_value_to_xml_content(value)?;
288            buffer.push_str(&content);
289
290            buffer.push_str(&format!("</{}>", root_element));
291        }
292        Value::Array(arr) => {
293            // For arrays, wrap each item
294            buffer.push_str(&format!("<{}>", root_element));
295            for item in arr {
296                buffer.push_str("<item>");
297                let content = serialize_value_to_xml_content(item)?;
298                buffer.push_str(&content);
299                buffer.push_str("</item>");
300            }
301            buffer.push_str(&format!("</{}>", root_element));
302        }
303        _ => {
304            // For primitives, wrap in root element
305            buffer.push_str(&format!("<{}>", root_element));
306            buffer.push_str(&value_to_xml_string(value));
307            buffer.push_str(&format!("</{}>", root_element));
308        }
309    }
310
311    Ok(buffer)
312}
313
314/// Serialize a JSON value's content to XML (without root wrapper).
315fn serialize_value_to_xml_content(value: &Value) -> Result<String> {
316    let mut result = String::new();
317
318    match value {
319        Value::Object(map) => {
320            for (key, val) in map {
321                // Sanitize key for XML element name
322                let safe_key = sanitize_xml_name(key);
323                result.push_str(&format!("<{}>", safe_key));
324
325                match val {
326                    Value::Object(_) | Value::Array(_) => {
327                        result.push_str(&serialize_value_to_xml_content(val)?);
328                    }
329                    _ => {
330                        result.push_str(&value_to_xml_string(val));
331                    }
332                }
333
334                result.push_str(&format!("</{}>", safe_key));
335            }
336        }
337        Value::Array(arr) => {
338            for item in arr {
339                result.push_str("<item>");
340                match item {
341                    Value::Object(_) | Value::Array(_) => {
342                        result.push_str(&serialize_value_to_xml_content(item)?);
343                    }
344                    _ => {
345                        result.push_str(&value_to_xml_string(item));
346                    }
347                }
348                result.push_str("</item>");
349            }
350        }
351        _ => {
352            result.push_str(&value_to_xml_string(value));
353        }
354    }
355
356    Ok(result)
357}
358
359/// Convert a primitive JSON value to an XML-safe string.
360fn value_to_xml_string(value: &Value) -> String {
361    match value {
362        Value::Null => String::new(),
363        Value::Bool(b) => b.to_string(),
364        Value::Number(n) => n.to_string(),
365        Value::String(s) => escape_xml(s),
366        _ => String::new(),
367    }
368}
369
370/// Escape special XML characters.
371fn escape_xml(s: &str) -> String {
372    s.replace('&', "&amp;")
373        .replace('<', "&lt;")
374        .replace('>', "&gt;")
375        .replace('"', "&quot;")
376        .replace('\'', "&apos;")
377}
378
379/// Sanitize a string to be a valid XML element name.
380fn sanitize_xml_name(name: &str) -> String {
381    let mut result = String::new();
382
383    for (i, c) in name.chars().enumerate() {
384        if i == 0 {
385            // First character must be letter or underscore
386            if c.is_ascii_alphabetic() || c == '_' {
387                result.push(c);
388            } else {
389                result.push('_');
390                if c.is_ascii_alphanumeric() {
391                    result.push(c);
392                }
393            }
394        } else {
395            // Subsequent characters can be letter, digit, hyphen, underscore, or period
396            if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' {
397                result.push(c);
398            } else {
399                result.push('_');
400            }
401        }
402    }
403
404    if result.is_empty() {
405        result = "_element".to_string();
406    }
407
408    result
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use serde_json::json;
415
416    #[test]
417    fn test_publish_config_from_json() {
418        let input = json!({
419            "source": "output",
420            "target": "json_string"
421        });
422
423        let config = PublishConfig::from_json(&input).unwrap();
424        assert_eq!(config.source, "output");
425        assert_eq!(config.target, "json_string");
426        assert!(!config.pretty);
427        assert_eq!(config.root_element, "root");
428    }
429
430    #[test]
431    fn test_publish_config_with_options() {
432        let input = json!({
433            "source": "data",
434            "target": "xml_output",
435            "pretty": true,
436            "root_element": "document"
437        });
438
439        let config = PublishConfig::from_json(&input).unwrap();
440        assert_eq!(config.source, "data");
441        assert_eq!(config.target, "xml_output");
442        assert!(config.pretty);
443        assert_eq!(config.root_element, "document");
444    }
445
446    #[test]
447    fn test_publish_config_missing_source() {
448        let input = json!({
449            "target": "output"
450        });
451
452        let result = PublishConfig::from_json(&input);
453        assert!(result.is_err());
454    }
455
456    #[test]
457    fn test_publish_config_missing_target() {
458        let input = json!({
459            "source": "input"
460        });
461
462        let result = PublishConfig::from_json(&input);
463        assert!(result.is_err());
464    }
465
466    #[test]
467    fn test_execute_publish_json() {
468        let mut message = Message::new(Arc::new(json!({})));
469        message.context["data"] = json!({
470            "user": {
471                "name": "John",
472                "age": 30
473            }
474        });
475
476        let config = PublishConfig {
477            source: "user".to_string(),
478            target: "user_json".to_string(),
479            pretty: false,
480            root_element: "root".to_string(),
481        };
482
483        let result = execute_publish_json(&mut message, &config);
484        assert!(result.is_ok());
485
486        let (status, changes) = result.unwrap();
487        assert_eq!(status, 200);
488        assert_eq!(changes.len(), 1);
489
490        // Verify JSON string was created
491        let json_string = message.data()["user_json"].as_str().unwrap();
492        assert!(json_string.contains("John"));
493        assert!(json_string.contains("30"));
494    }
495
496    #[test]
497    fn test_execute_publish_json_pretty() {
498        let mut message = Message::new(Arc::new(json!({})));
499        message.context["data"] = json!({
500            "user": {
501                "name": "Alice"
502            }
503        });
504
505        let config = PublishConfig {
506            source: "user".to_string(),
507            target: "output".to_string(),
508            pretty: true,
509            root_element: "root".to_string(),
510        };
511
512        let result = execute_publish_json(&mut message, &config);
513        assert!(result.is_ok());
514
515        let json_string = message.data()["output"].as_str().unwrap();
516        // Pretty printed JSON has newlines
517        assert!(json_string.contains('\n'));
518    }
519
520    #[test]
521    fn test_execute_publish_json_not_found() {
522        let mut message = Message::new(Arc::new(json!({})));
523
524        let config = PublishConfig {
525            source: "nonexistent".to_string(),
526            target: "output".to_string(),
527            pretty: false,
528            root_element: "root".to_string(),
529        };
530
531        let result = execute_publish_json(&mut message, &config);
532        assert!(result.is_err());
533    }
534
535    #[test]
536    fn test_execute_publish_xml() {
537        let mut message = Message::new(Arc::new(json!({})));
538        message.context["data"] = json!({
539            "user": {
540                "name": "John",
541                "age": 30
542            }
543        });
544
545        let config = PublishConfig {
546            source: "user".to_string(),
547            target: "user_xml".to_string(),
548            pretty: false,
549            root_element: "user".to_string(),
550        };
551
552        let result = execute_publish_xml(&mut message, &config);
553        assert!(result.is_ok());
554
555        let (status, _) = result.unwrap();
556        assert_eq!(status, 200);
557
558        // Verify XML string was created
559        let xml_string = message.data()["user_xml"].as_str().unwrap();
560        assert!(xml_string.contains("<user>"));
561        assert!(xml_string.contains("</user>"));
562        assert!(xml_string.contains("<name>John</name>"));
563    }
564
565    #[test]
566    fn test_execute_publish_xml_not_found() {
567        let mut message = Message::new(Arc::new(json!({})));
568
569        let config = PublishConfig {
570            source: "nonexistent".to_string(),
571            target: "output".to_string(),
572            pretty: false,
573            root_element: "root".to_string(),
574        };
575
576        let result = execute_publish_xml(&mut message, &config);
577        assert!(result.is_err());
578    }
579
580    #[test]
581    fn test_json_to_xml_simple() {
582        let value = json!({
583            "name": "Test",
584            "value": 42
585        });
586
587        let result = json_to_xml(&value, "root");
588        assert!(result.is_ok());
589
590        let xml = result.unwrap();
591        assert!(xml.contains("<root>"));
592        assert!(xml.contains("</root>"));
593        assert!(xml.contains("<name>Test</name>"));
594        assert!(xml.contains("<value>42</value>"));
595    }
596
597    #[test]
598    fn test_json_to_xml_nested() {
599        let value = json!({
600            "user": {
601                "name": "Alice",
602                "email": "alice@example.com"
603            }
604        });
605
606        let result = json_to_xml(&value, "data");
607        assert!(result.is_ok());
608
609        let xml = result.unwrap();
610        assert!(xml.contains("<data>"));
611        assert!(xml.contains("<user>"));
612        assert!(xml.contains("<name>Alice</name>"));
613    }
614
615    #[test]
616    fn test_json_to_xml_array() {
617        let value = json!([1, 2, 3]);
618
619        let result = json_to_xml(&value, "numbers");
620        assert!(result.is_ok());
621
622        let xml = result.unwrap();
623        assert!(xml.contains("<numbers>"));
624        assert!(xml.contains("<item>1</item>"));
625        assert!(xml.contains("<item>2</item>"));
626        assert!(xml.contains("<item>3</item>"));
627    }
628
629    #[test]
630    fn test_json_to_xml_special_chars() {
631        let value = json!({
632            "text": "<script>alert('xss')</script>"
633        });
634
635        let result = json_to_xml(&value, "root");
636        assert!(result.is_ok());
637
638        let xml = result.unwrap();
639        // Should be escaped
640        assert!(xml.contains("&lt;script&gt;"));
641        assert!(!xml.contains("<script>"));
642    }
643
644    #[test]
645    fn test_escape_xml() {
646        assert_eq!(escape_xml("hello"), "hello");
647        assert_eq!(escape_xml("<tag>"), "&lt;tag&gt;");
648        assert_eq!(escape_xml("a & b"), "a &amp; b");
649        assert_eq!(escape_xml("\"quoted\""), "&quot;quoted&quot;");
650    }
651
652    #[test]
653    fn test_sanitize_xml_name() {
654        assert_eq!(sanitize_xml_name("valid"), "valid");
655        assert_eq!(sanitize_xml_name("_valid"), "_valid");
656        assert_eq!(sanitize_xml_name("123invalid"), "_123invalid");
657        assert_eq!(sanitize_xml_name("has spaces"), "has_spaces");
658        assert_eq!(sanitize_xml_name("has-dash"), "has-dash");
659        assert_eq!(sanitize_xml_name(""), "_element");
660    }
661
662    #[test]
663    fn test_execute_publish_json_nested_source() {
664        let mut message = Message::new(Arc::new(json!({})));
665        message.context["data"] = json!({
666            "response": {
667                "body": {
668                    "message": "success"
669                }
670            }
671        });
672
673        let config = PublishConfig {
674            source: "response.body".to_string(),
675            target: "output".to_string(),
676            pretty: false,
677            root_element: "root".to_string(),
678        };
679
680        let result = execute_publish_json(&mut message, &config);
681        assert!(result.is_ok());
682
683        let json_string = message.data()["output"].as_str().unwrap();
684        assert!(json_string.contains("success"));
685    }
686}