dataflow_rs/engine/functions/
publish.rs1use crate::engine::error::{DataflowError, Result};
9use crate::engine::message::{Change, Message};
10use crate::engine::task_outcome::TaskOutcome;
11use crate::engine::utils::{get_nested_value, set_nested_value};
12use datavalue::OwnedDataValue;
13use log::debug;
14use serde::Deserialize;
15use serde_json::Value;
16use std::sync::Arc;
17
18#[derive(Debug, Clone, Deserialize)]
20pub struct PublishConfig {
21 pub source: String,
23
24 pub target: String,
26
27 #[serde(default)]
29 pub pretty: bool,
30
31 #[serde(default = "default_root_element")]
33 pub root_element: String,
34}
35
36fn default_root_element() -> String {
37 "root".to_string()
38}
39
40impl PublishConfig {
41 pub fn from_json(input: &Value) -> Result<Self> {
42 let source = input
43 .get("source")
44 .and_then(Value::as_str)
45 .ok_or_else(|| {
46 DataflowError::Validation("Missing 'source' in publish config".to_string())
47 })?
48 .to_string();
49
50 let target = input
51 .get("target")
52 .and_then(Value::as_str)
53 .ok_or_else(|| {
54 DataflowError::Validation("Missing 'target' in publish config".to_string())
55 })?
56 .to_string();
57
58 let pretty = input
59 .get("pretty")
60 .and_then(Value::as_bool)
61 .unwrap_or(false);
62
63 let root_element = input
64 .get("root_element")
65 .and_then(Value::as_str)
66 .map(String::from)
67 .unwrap_or_else(default_root_element);
68
69 Ok(PublishConfig {
70 source,
71 target,
72 pretty,
73 root_element,
74 })
75 }
76
77 fn extract_source(&self, message: &Message) -> OwnedDataValue {
79 if let Some(value) = message.data().get(&self.source) {
81 return value.clone();
82 }
83
84 if let Some(value) = get_nested_value(message.data(), &self.source) {
86 return value.clone();
87 }
88
89 if let Some(path) = self.source.strip_prefix("data.")
91 && let Some(value) = get_nested_value(message.data(), path)
92 {
93 return value.clone();
94 }
95
96 OwnedDataValue::Null
97 }
98}
99
100pub fn execute_publish_json(
103 message: &mut Message,
104 config: &PublishConfig,
105) -> Result<(TaskOutcome, Vec<Change>)> {
106 debug!(
107 "PublishJson: Serializing 'data.{}' to 'data.{}'",
108 config.source, config.target
109 );
110
111 let source_data = config.extract_source(message);
112
113 if matches!(source_data, OwnedDataValue::Null) {
114 return Err(DataflowError::Validation(format!(
115 "PublishJson: Source 'data.{}' not found or is null",
116 config.source
117 )));
118 }
119
120 let json_string = if config.pretty {
124 let bridge = Value::from(&source_data);
125 serde_json::to_string_pretty(&bridge)
126 .map_err(|e| DataflowError::Validation(format!("Failed to serialize to JSON: {}", e)))?
127 } else {
128 source_data.to_json_string()
129 };
130
131 let target_path = format!("data.{}", config.target);
132 let old_value = get_nested_value(&message.context, &target_path)
133 .cloned()
134 .unwrap_or(OwnedDataValue::Null);
135 let new_value = OwnedDataValue::String(json_string);
136
137 set_nested_value(&mut message.context, &target_path, new_value.clone());
138
139 Ok((
140 TaskOutcome::Success,
141 vec![Change {
142 path: Arc::from(target_path),
143 old_value,
144 new_value,
145 }],
146 ))
147}
148
149pub fn execute_publish_xml(
153 message: &mut Message,
154 config: &PublishConfig,
155) -> Result<(TaskOutcome, Vec<Change>)> {
156 debug!(
157 "PublishXml: Serializing 'data.{}' to 'data.{}'",
158 config.source, config.target
159 );
160
161 let source_data = config.extract_source(message);
162
163 if matches!(source_data, OwnedDataValue::Null) {
164 return Err(DataflowError::Validation(format!(
165 "PublishXml: Source 'data.{}' not found or is null",
166 config.source
167 )));
168 }
169
170 let bridge = Value::from(&source_data);
171 let xml_string = json_to_xml(&bridge, &config.root_element)?;
172
173 let target_path = format!("data.{}", config.target);
174 let old_value = get_nested_value(&message.context, &target_path)
175 .cloned()
176 .unwrap_or(OwnedDataValue::Null);
177 let new_value = OwnedDataValue::String(xml_string);
178
179 set_nested_value(&mut message.context, &target_path, new_value.clone());
180
181 Ok((
182 TaskOutcome::Success,
183 vec![Change {
184 path: Arc::from(target_path),
185 old_value,
186 new_value,
187 }],
188 ))
189}
190
191fn json_to_xml(value: &Value, root_element: &str) -> Result<String> {
195 let mut buffer = String::new();
196
197 match value {
198 Value::Object(_) => {
199 buffer.push_str(&format!("<{}>", root_element));
200 let content = serialize_value_to_xml_content(value)?;
201 buffer.push_str(&content);
202 buffer.push_str(&format!("</{}>", root_element));
203 }
204 Value::Array(arr) => {
205 buffer.push_str(&format!("<{}>", root_element));
206 for item in arr {
207 buffer.push_str("<item>");
208 let content = serialize_value_to_xml_content(item)?;
209 buffer.push_str(&content);
210 buffer.push_str("</item>");
211 }
212 buffer.push_str(&format!("</{}>", root_element));
213 }
214 _ => {
215 buffer.push_str(&format!("<{}>", root_element));
216 buffer.push_str(&value_to_xml_string(value));
217 buffer.push_str(&format!("</{}>", root_element));
218 }
219 }
220
221 Ok(buffer)
222}
223
224fn serialize_value_to_xml_content(value: &Value) -> Result<String> {
225 let mut result = String::new();
226
227 match value {
228 Value::Object(map) => {
229 for (key, val) in map {
230 let safe_key = sanitize_xml_name(key);
231 result.push_str(&format!("<{}>", safe_key));
232 match val {
233 Value::Object(_) | Value::Array(_) => {
234 result.push_str(&serialize_value_to_xml_content(val)?);
235 }
236 _ => {
237 result.push_str(&value_to_xml_string(val));
238 }
239 }
240 result.push_str(&format!("</{}>", safe_key));
241 }
242 }
243 Value::Array(arr) => {
244 for item in arr {
245 result.push_str("<item>");
246 match item {
247 Value::Object(_) | Value::Array(_) => {
248 result.push_str(&serialize_value_to_xml_content(item)?);
249 }
250 _ => {
251 result.push_str(&value_to_xml_string(item));
252 }
253 }
254 result.push_str("</item>");
255 }
256 }
257 _ => {
258 result.push_str(&value_to_xml_string(value));
259 }
260 }
261
262 Ok(result)
263}
264
265fn value_to_xml_string(value: &Value) -> String {
266 match value {
267 Value::Null => String::new(),
268 Value::Bool(b) => b.to_string(),
269 Value::Number(n) => n.to_string(),
270 Value::String(s) => escape_xml(s),
271 _ => String::new(),
272 }
273}
274
275fn escape_xml(s: &str) -> String {
276 s.replace('&', "&")
277 .replace('<', "<")
278 .replace('>', ">")
279 .replace('"', """)
280 .replace('\'', "'")
281}
282
283fn sanitize_xml_name(name: &str) -> String {
284 let mut result = String::new();
285
286 for (i, c) in name.chars().enumerate() {
287 if i == 0 {
288 if c.is_ascii_alphabetic() || c == '_' {
289 result.push(c);
290 } else {
291 result.push('_');
292 if c.is_ascii_alphanumeric() {
293 result.push(c);
294 }
295 }
296 } else if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' {
297 result.push(c);
298 } else {
299 result.push('_');
300 }
301 }
302
303 if result.is_empty() {
304 result = "_element".to_string();
305 }
306
307 result
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use serde_json::json;
314
315 fn dv(v: serde_json::Value) -> OwnedDataValue {
316 OwnedDataValue::from(&v)
317 }
318
319 fn message_with_data(initial: serde_json::Value) -> Message {
320 let mut m = Message::new(Arc::new(dv(json!({}))));
321 set_nested_value(&mut m.context, "data", dv(initial));
322 m
323 }
324
325 #[test]
326 fn test_publish_config_from_json() {
327 let input = json!({"source": "output", "target": "json_string"});
328 let config = PublishConfig::from_json(&input).unwrap();
329 assert_eq!(config.source, "output");
330 assert_eq!(config.target, "json_string");
331 assert!(!config.pretty);
332 assert_eq!(config.root_element, "root");
333 }
334
335 #[test]
336 fn test_publish_config_with_options() {
337 let input = json!({
338 "source": "data",
339 "target": "xml_output",
340 "pretty": true,
341 "root_element": "document"
342 });
343
344 let config = PublishConfig::from_json(&input).unwrap();
345 assert_eq!(config.source, "data");
346 assert_eq!(config.target, "xml_output");
347 assert!(config.pretty);
348 assert_eq!(config.root_element, "document");
349 }
350
351 #[test]
352 fn test_publish_config_missing_source() {
353 assert!(PublishConfig::from_json(&json!({"target": "output"})).is_err());
354 }
355
356 #[test]
357 fn test_publish_config_missing_target() {
358 assert!(PublishConfig::from_json(&json!({"source": "input"})).is_err());
359 }
360
361 #[test]
362 fn test_execute_publish_json() {
363 let mut message = message_with_data(json!({"user": {"name": "John", "age": 30}}));
364
365 let config = PublishConfig {
366 source: "user".to_string(),
367 target: "user_json".to_string(),
368 pretty: false,
369 root_element: "root".to_string(),
370 };
371
372 let result = execute_publish_json(&mut message, &config);
373 assert!(result.is_ok());
374
375 let (outcome, changes) = result.unwrap();
376 assert_eq!(outcome, TaskOutcome::Success);
377 assert_eq!(changes.len(), 1);
378
379 let json_string = message.data()["user_json"].as_str().unwrap();
380 assert!(json_string.contains("John"));
381 assert!(json_string.contains("30"));
382 }
383
384 #[test]
385 fn test_execute_publish_json_pretty() {
386 let mut message = message_with_data(json!({"user": {"name": "Alice"}}));
387
388 let config = PublishConfig {
389 source: "user".to_string(),
390 target: "output".to_string(),
391 pretty: true,
392 root_element: "root".to_string(),
393 };
394
395 let result = execute_publish_json(&mut message, &config);
396 assert!(result.is_ok());
397
398 let json_string = message.data()["output"].as_str().unwrap();
399 assert!(json_string.contains('\n'));
400 }
401
402 #[test]
403 fn test_execute_publish_json_not_found() {
404 let mut message = Message::new(Arc::new(dv(json!({}))));
405
406 let config = PublishConfig {
407 source: "nonexistent".to_string(),
408 target: "output".to_string(),
409 pretty: false,
410 root_element: "root".to_string(),
411 };
412
413 assert!(execute_publish_json(&mut message, &config).is_err());
414 }
415
416 #[test]
417 fn test_execute_publish_xml() {
418 let mut message = message_with_data(json!({"user": {"name": "John", "age": 30}}));
419
420 let config = PublishConfig {
421 source: "user".to_string(),
422 target: "user_xml".to_string(),
423 pretty: false,
424 root_element: "user".to_string(),
425 };
426
427 let result = execute_publish_xml(&mut message, &config);
428 assert!(result.is_ok());
429
430 let (outcome, _) = result.unwrap();
431 assert_eq!(outcome, TaskOutcome::Success);
432
433 let xml_string = message.data()["user_xml"].as_str().unwrap();
434 assert!(xml_string.contains("<user>"));
435 assert!(xml_string.contains("</user>"));
436 assert!(xml_string.contains("<name>John</name>"));
437 }
438
439 #[test]
440 fn test_execute_publish_xml_not_found() {
441 let mut message = Message::new(Arc::new(dv(json!({}))));
442
443 let config = PublishConfig {
444 source: "nonexistent".to_string(),
445 target: "output".to_string(),
446 pretty: false,
447 root_element: "root".to_string(),
448 };
449
450 assert!(execute_publish_xml(&mut message, &config).is_err());
451 }
452
453 #[test]
454 fn test_json_to_xml_simple() {
455 let value = json!({"name": "Test", "value": 42});
456 let xml = json_to_xml(&value, "root").unwrap();
457 assert!(xml.contains("<root>"));
458 assert!(xml.contains("</root>"));
459 assert!(xml.contains("<name>Test</name>"));
460 assert!(xml.contains("<value>42</value>"));
461 }
462
463 #[test]
464 fn test_json_to_xml_nested() {
465 let value = json!({"user": {"name": "Alice", "email": "alice@example.com"}});
466 let xml = json_to_xml(&value, "data").unwrap();
467 assert!(xml.contains("<data>"));
468 assert!(xml.contains("<user>"));
469 assert!(xml.contains("<name>Alice</name>"));
470 }
471
472 #[test]
473 fn test_json_to_xml_array() {
474 let value = json!([1, 2, 3]);
475 let xml = json_to_xml(&value, "numbers").unwrap();
476 assert!(xml.contains("<numbers>"));
477 assert!(xml.contains("<item>1</item>"));
478 assert!(xml.contains("<item>2</item>"));
479 assert!(xml.contains("<item>3</item>"));
480 }
481
482 #[test]
483 fn test_json_to_xml_special_chars() {
484 let value = json!({"text": "<script>alert('xss')</script>"});
485 let xml = json_to_xml(&value, "root").unwrap();
486 assert!(xml.contains("<script>"));
487 assert!(!xml.contains("<script>"));
488 }
489
490 #[test]
491 fn test_escape_xml() {
492 assert_eq!(escape_xml("hello"), "hello");
493 assert_eq!(escape_xml("<tag>"), "<tag>");
494 assert_eq!(escape_xml("a & b"), "a & b");
495 assert_eq!(escape_xml("\"quoted\""), ""quoted"");
496 }
497
498 #[test]
499 fn test_sanitize_xml_name() {
500 assert_eq!(sanitize_xml_name("valid"), "valid");
501 assert_eq!(sanitize_xml_name("_valid"), "_valid");
502 assert_eq!(sanitize_xml_name("123invalid"), "_123invalid");
503 assert_eq!(sanitize_xml_name("has spaces"), "has_spaces");
504 assert_eq!(sanitize_xml_name("has-dash"), "has-dash");
505 assert_eq!(sanitize_xml_name(""), "_element");
506 }
507
508 #[test]
509 fn test_execute_publish_json_nested_source() {
510 let mut message = message_with_data(json!({
511 "response": {"body": {"message": "success"}}
512 }));
513
514 let config = PublishConfig {
515 source: "response.body".to_string(),
516 target: "output".to_string(),
517 pretty: false,
518 root_element: "root".to_string(),
519 };
520
521 let result = execute_publish_json(&mut message, &config);
522 assert!(result.is_ok());
523
524 let json_string = message.data()["output"].as_str().unwrap();
525 assert!(json_string.contains("success"));
526 }
527}