dataflow_rs/engine/functions/
publish.rs1use crate::engine::error::{DataflowError, Result};
27use crate::engine::message::{Change, Message};
28use crate::engine::utils::get_nested_value;
29use log::debug;
30use serde::Deserialize;
31use serde_json::Value;
32use std::sync::Arc;
33
34#[derive(Debug, Clone, Deserialize)]
39pub struct PublishConfig {
40 pub source: String,
44
45 pub target: String,
48
49 #[serde(default)]
52 pub pretty: bool,
53
54 #[serde(default = "default_root_element")]
57 pub root_element: String,
58}
59
60fn default_root_element() -> String {
61 "root".to_string()
62}
63
64impl PublishConfig {
65 pub fn from_json(input: &Value) -> Result<Self> {
73 let source = input
74 .get("source")
75 .and_then(Value::as_str)
76 .ok_or_else(|| {
77 DataflowError::Validation("Missing 'source' in publish config".to_string())
78 })?
79 .to_string();
80
81 let target = input
82 .get("target")
83 .and_then(Value::as_str)
84 .ok_or_else(|| {
85 DataflowError::Validation("Missing 'target' in publish config".to_string())
86 })?
87 .to_string();
88
89 let pretty = input
90 .get("pretty")
91 .and_then(Value::as_bool)
92 .unwrap_or(false);
93
94 let root_element = input
95 .get("root_element")
96 .and_then(Value::as_str)
97 .map(String::from)
98 .unwrap_or_else(default_root_element);
99
100 Ok(PublishConfig {
101 source,
102 target,
103 pretty,
104 root_element,
105 })
106 }
107
108 fn extract_source(&self, message: &Message) -> Value {
116 if let Some(value) = message.data().get(&self.source) {
118 return value.clone();
119 }
120
121 if let Some(value) = get_nested_value(message.data(), &self.source) {
123 return value.clone();
124 }
125
126 if let Some(path) = self.source.strip_prefix("data.")
128 && let Some(value) = get_nested_value(message.data(), path)
129 {
130 return value.clone();
131 }
132
133 Value::Null
134 }
135}
136
137pub fn execute_publish_json(
150 message: &mut Message,
151 config: &PublishConfig,
152) -> Result<(usize, Vec<Change>)> {
153 debug!(
154 "PublishJson: Serializing 'data.{}' to 'data.{}'",
155 config.source, config.target
156 );
157
158 let source_data = config.extract_source(message);
160
161 if source_data.is_null() {
162 return Err(DataflowError::Validation(format!(
163 "PublishJson: Source 'data.{}' not found or is null",
164 config.source
165 )));
166 }
167
168 let json_string = if config.pretty {
170 serde_json::to_string_pretty(&source_data)
171 } else {
172 serde_json::to_string(&source_data)
173 }
174 .map_err(|e| DataflowError::Validation(format!("Failed to serialize to JSON: {}", e)))?;
175
176 let old_value = message
178 .data()
179 .get(&config.target)
180 .cloned()
181 .unwrap_or(Value::Null);
182
183 let new_value = Value::String(json_string);
184
185 if let Some(data_obj) = message.data_mut().as_object_mut() {
187 data_obj.insert(config.target.clone(), new_value.clone());
188 }
189
190 message.invalidate_context_cache();
192
193 debug!(
194 "PublishJson: Successfully serialized to 'data.{}'",
195 config.target
196 );
197
198 Ok((
199 200,
200 vec![Change {
201 path: Arc::from(format!("data.{}", config.target)),
202 old_value: Arc::new(old_value),
203 new_value: Arc::new(new_value),
204 }],
205 ))
206}
207
208pub fn execute_publish_xml(
221 message: &mut Message,
222 config: &PublishConfig,
223) -> Result<(usize, Vec<Change>)> {
224 debug!(
225 "PublishXml: Serializing 'data.{}' to 'data.{}'",
226 config.source, config.target
227 );
228
229 let source_data = config.extract_source(message);
231
232 if source_data.is_null() {
233 return Err(DataflowError::Validation(format!(
234 "PublishXml: Source 'data.{}' not found or is null",
235 config.source
236 )));
237 }
238
239 let xml_string = json_to_xml(&source_data, &config.root_element)?;
241
242 let old_value = message
244 .data()
245 .get(&config.target)
246 .cloned()
247 .unwrap_or(Value::Null);
248
249 let new_value = Value::String(xml_string);
250
251 if let Some(data_obj) = message.data_mut().as_object_mut() {
253 data_obj.insert(config.target.clone(), new_value.clone());
254 }
255
256 message.invalidate_context_cache();
258
259 debug!(
260 "PublishXml: Successfully serialized to 'data.{}'",
261 config.target
262 );
263
264 Ok((
265 200,
266 vec![Change {
267 path: Arc::from(format!("data.{}", config.target)),
268 old_value: Arc::new(old_value),
269 new_value: Arc::new(new_value),
270 }],
271 ))
272}
273
274fn json_to_xml(value: &Value, root_element: &str) -> Result<String> {
278 let mut buffer = String::new();
279
280 match value {
282 Value::Object(_) => {
283 buffer.push_str(&format!("<{}>", root_element));
285
286 let content = serialize_value_to_xml_content(value)?;
288 buffer.push_str(&content);
289
290 buffer.push_str(&format!("</{}>", root_element));
291 }
292 Value::Array(arr) => {
293 buffer.push_str(&format!("<{}>", root_element));
295 for item in arr {
296 buffer.push_str("<item>");
297 let content = serialize_value_to_xml_content(item)?;
298 buffer.push_str(&content);
299 buffer.push_str("</item>");
300 }
301 buffer.push_str(&format!("</{}>", root_element));
302 }
303 _ => {
304 buffer.push_str(&format!("<{}>", root_element));
306 buffer.push_str(&value_to_xml_string(value));
307 buffer.push_str(&format!("</{}>", root_element));
308 }
309 }
310
311 Ok(buffer)
312}
313
314fn serialize_value_to_xml_content(value: &Value) -> Result<String> {
316 let mut result = String::new();
317
318 match value {
319 Value::Object(map) => {
320 for (key, val) in map {
321 let safe_key = sanitize_xml_name(key);
323 result.push_str(&format!("<{}>", safe_key));
324
325 match val {
326 Value::Object(_) | Value::Array(_) => {
327 result.push_str(&serialize_value_to_xml_content(val)?);
328 }
329 _ => {
330 result.push_str(&value_to_xml_string(val));
331 }
332 }
333
334 result.push_str(&format!("</{}>", safe_key));
335 }
336 }
337 Value::Array(arr) => {
338 for item in arr {
339 result.push_str("<item>");
340 match item {
341 Value::Object(_) | Value::Array(_) => {
342 result.push_str(&serialize_value_to_xml_content(item)?);
343 }
344 _ => {
345 result.push_str(&value_to_xml_string(item));
346 }
347 }
348 result.push_str("</item>");
349 }
350 }
351 _ => {
352 result.push_str(&value_to_xml_string(value));
353 }
354 }
355
356 Ok(result)
357}
358
359fn value_to_xml_string(value: &Value) -> String {
361 match value {
362 Value::Null => String::new(),
363 Value::Bool(b) => b.to_string(),
364 Value::Number(n) => n.to_string(),
365 Value::String(s) => escape_xml(s),
366 _ => String::new(),
367 }
368}
369
370fn escape_xml(s: &str) -> String {
372 s.replace('&', "&")
373 .replace('<', "<")
374 .replace('>', ">")
375 .replace('"', """)
376 .replace('\'', "'")
377}
378
379fn sanitize_xml_name(name: &str) -> String {
381 let mut result = String::new();
382
383 for (i, c) in name.chars().enumerate() {
384 if i == 0 {
385 if c.is_ascii_alphabetic() || c == '_' {
387 result.push(c);
388 } else {
389 result.push('_');
390 if c.is_ascii_alphanumeric() {
391 result.push(c);
392 }
393 }
394 } else {
395 if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' {
397 result.push(c);
398 } else {
399 result.push('_');
400 }
401 }
402 }
403
404 if result.is_empty() {
405 result = "_element".to_string();
406 }
407
408 result
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use serde_json::json;
415
416 #[test]
417 fn test_publish_config_from_json() {
418 let input = json!({
419 "source": "output",
420 "target": "json_string"
421 });
422
423 let config = PublishConfig::from_json(&input).unwrap();
424 assert_eq!(config.source, "output");
425 assert_eq!(config.target, "json_string");
426 assert!(!config.pretty);
427 assert_eq!(config.root_element, "root");
428 }
429
430 #[test]
431 fn test_publish_config_with_options() {
432 let input = json!({
433 "source": "data",
434 "target": "xml_output",
435 "pretty": true,
436 "root_element": "document"
437 });
438
439 let config = PublishConfig::from_json(&input).unwrap();
440 assert_eq!(config.source, "data");
441 assert_eq!(config.target, "xml_output");
442 assert!(config.pretty);
443 assert_eq!(config.root_element, "document");
444 }
445
446 #[test]
447 fn test_publish_config_missing_source() {
448 let input = json!({
449 "target": "output"
450 });
451
452 let result = PublishConfig::from_json(&input);
453 assert!(result.is_err());
454 }
455
456 #[test]
457 fn test_publish_config_missing_target() {
458 let input = json!({
459 "source": "input"
460 });
461
462 let result = PublishConfig::from_json(&input);
463 assert!(result.is_err());
464 }
465
466 #[test]
467 fn test_execute_publish_json() {
468 let mut message = Message::new(Arc::new(json!({})));
469 message.context["data"] = json!({
470 "user": {
471 "name": "John",
472 "age": 30
473 }
474 });
475
476 let config = PublishConfig {
477 source: "user".to_string(),
478 target: "user_json".to_string(),
479 pretty: false,
480 root_element: "root".to_string(),
481 };
482
483 let result = execute_publish_json(&mut message, &config);
484 assert!(result.is_ok());
485
486 let (status, changes) = result.unwrap();
487 assert_eq!(status, 200);
488 assert_eq!(changes.len(), 1);
489
490 let json_string = message.data()["user_json"].as_str().unwrap();
492 assert!(json_string.contains("John"));
493 assert!(json_string.contains("30"));
494 }
495
496 #[test]
497 fn test_execute_publish_json_pretty() {
498 let mut message = Message::new(Arc::new(json!({})));
499 message.context["data"] = json!({
500 "user": {
501 "name": "Alice"
502 }
503 });
504
505 let config = PublishConfig {
506 source: "user".to_string(),
507 target: "output".to_string(),
508 pretty: true,
509 root_element: "root".to_string(),
510 };
511
512 let result = execute_publish_json(&mut message, &config);
513 assert!(result.is_ok());
514
515 let json_string = message.data()["output"].as_str().unwrap();
516 assert!(json_string.contains('\n'));
518 }
519
520 #[test]
521 fn test_execute_publish_json_not_found() {
522 let mut message = Message::new(Arc::new(json!({})));
523
524 let config = PublishConfig {
525 source: "nonexistent".to_string(),
526 target: "output".to_string(),
527 pretty: false,
528 root_element: "root".to_string(),
529 };
530
531 let result = execute_publish_json(&mut message, &config);
532 assert!(result.is_err());
533 }
534
535 #[test]
536 fn test_execute_publish_xml() {
537 let mut message = Message::new(Arc::new(json!({})));
538 message.context["data"] = json!({
539 "user": {
540 "name": "John",
541 "age": 30
542 }
543 });
544
545 let config = PublishConfig {
546 source: "user".to_string(),
547 target: "user_xml".to_string(),
548 pretty: false,
549 root_element: "user".to_string(),
550 };
551
552 let result = execute_publish_xml(&mut message, &config);
553 assert!(result.is_ok());
554
555 let (status, _) = result.unwrap();
556 assert_eq!(status, 200);
557
558 let xml_string = message.data()["user_xml"].as_str().unwrap();
560 assert!(xml_string.contains("<user>"));
561 assert!(xml_string.contains("</user>"));
562 assert!(xml_string.contains("<name>John</name>"));
563 }
564
565 #[test]
566 fn test_execute_publish_xml_not_found() {
567 let mut message = Message::new(Arc::new(json!({})));
568
569 let config = PublishConfig {
570 source: "nonexistent".to_string(),
571 target: "output".to_string(),
572 pretty: false,
573 root_element: "root".to_string(),
574 };
575
576 let result = execute_publish_xml(&mut message, &config);
577 assert!(result.is_err());
578 }
579
580 #[test]
581 fn test_json_to_xml_simple() {
582 let value = json!({
583 "name": "Test",
584 "value": 42
585 });
586
587 let result = json_to_xml(&value, "root");
588 assert!(result.is_ok());
589
590 let xml = result.unwrap();
591 assert!(xml.contains("<root>"));
592 assert!(xml.contains("</root>"));
593 assert!(xml.contains("<name>Test</name>"));
594 assert!(xml.contains("<value>42</value>"));
595 }
596
597 #[test]
598 fn test_json_to_xml_nested() {
599 let value = json!({
600 "user": {
601 "name": "Alice",
602 "email": "alice@example.com"
603 }
604 });
605
606 let result = json_to_xml(&value, "data");
607 assert!(result.is_ok());
608
609 let xml = result.unwrap();
610 assert!(xml.contains("<data>"));
611 assert!(xml.contains("<user>"));
612 assert!(xml.contains("<name>Alice</name>"));
613 }
614
615 #[test]
616 fn test_json_to_xml_array() {
617 let value = json!([1, 2, 3]);
618
619 let result = json_to_xml(&value, "numbers");
620 assert!(result.is_ok());
621
622 let xml = result.unwrap();
623 assert!(xml.contains("<numbers>"));
624 assert!(xml.contains("<item>1</item>"));
625 assert!(xml.contains("<item>2</item>"));
626 assert!(xml.contains("<item>3</item>"));
627 }
628
629 #[test]
630 fn test_json_to_xml_special_chars() {
631 let value = json!({
632 "text": "<script>alert('xss')</script>"
633 });
634
635 let result = json_to_xml(&value, "root");
636 assert!(result.is_ok());
637
638 let xml = result.unwrap();
639 assert!(xml.contains("<script>"));
641 assert!(!xml.contains("<script>"));
642 }
643
644 #[test]
645 fn test_escape_xml() {
646 assert_eq!(escape_xml("hello"), "hello");
647 assert_eq!(escape_xml("<tag>"), "<tag>");
648 assert_eq!(escape_xml("a & b"), "a & b");
649 assert_eq!(escape_xml("\"quoted\""), ""quoted"");
650 }
651
652 #[test]
653 fn test_sanitize_xml_name() {
654 assert_eq!(sanitize_xml_name("valid"), "valid");
655 assert_eq!(sanitize_xml_name("_valid"), "_valid");
656 assert_eq!(sanitize_xml_name("123invalid"), "_123invalid");
657 assert_eq!(sanitize_xml_name("has spaces"), "has_spaces");
658 assert_eq!(sanitize_xml_name("has-dash"), "has-dash");
659 assert_eq!(sanitize_xml_name(""), "_element");
660 }
661
662 #[test]
663 fn test_execute_publish_json_nested_source() {
664 let mut message = Message::new(Arc::new(json!({})));
665 message.context["data"] = json!({
666 "response": {
667 "body": {
668 "message": "success"
669 }
670 }
671 });
672
673 let config = PublishConfig {
674 source: "response.body".to_string(),
675 target: "output".to_string(),
676 pretty: false,
677 root_element: "root".to_string(),
678 };
679
680 let result = execute_publish_json(&mut message, &config);
681 assert!(result.is_ok());
682
683 let json_string = message.data()["output"].as_str().unwrap();
684 assert!(json_string.contains("success"));
685 }
686}