mx_message/plugin/
publish.rs1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4 AsyncFunctionHandler, FunctionConfig,
5 error::Result,
6 message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::Value;
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13pub struct Publish;
14
15#[async_trait]
16impl AsyncFunctionHandler for Publish {
17 #[instrument(skip(self, message, config, _datalogic))]
18 async fn execute(
19 &self,
20 message: &mut Message,
21 config: &FunctionConfig,
22 _datalogic: Arc<DataLogic>,
23 ) -> Result<(usize, Vec<Change>)> {
24 debug!("Starting JSON to MX message publishing");
25
26 let input = match config {
28 FunctionConfig::Custom { input, .. } => input,
29 _ => {
30 return Err(DataflowError::Validation(
31 "Invalid configuration type".to_string(),
32 ));
33 }
34 };
35
36 let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
38 DataflowError::Validation("'source' parameter is required".to_string())
39 })?;
40
41 let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
42 DataflowError::Validation("'target' parameter is required".to_string())
43 })?;
44
45 let json_data = message.data().get(source_field).cloned().ok_or_else(|| {
47 error!(
48 source_field = %source_field,
49 available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
50 "JSON data field not found in message data"
51 );
52 DataflowError::Validation(format!(
53 "Field '{}' not found in message data",
54 source_field
55 ))
56 })?;
57
58 debug!(
59 source_field = %source_field,
60 target_field = %target_field,
61 "Processing JSON to MX conversion"
62 );
63
64 let mx_message = self.json_to_mx(&json_data)?;
66
67 debug!(
68 message_length = mx_message.len(),
69 "MX message published successfully"
70 );
71
72 let old_value = message
74 .data()
75 .get(target_field)
76 .cloned()
77 .unwrap_or(Value::Null);
78
79 message.data_mut()[target_field] = Value::String(mx_message.clone());
80
81 message.invalidate_context_cache();
83
84 Ok((
85 200,
86 vec![Change {
87 path: Arc::from(format!("data.{}", target_field)),
88 old_value: Arc::new(old_value),
89 new_value: Arc::new(Value::String(mx_message)),
90 }],
91 ))
92 }
93}
94
95impl Publish {
96 fn json_to_mx(&self, json_data: &Value) -> Result<String> {
98 use crate::mx_envelope::MxMessage;
99
100 let json_str = serde_json::to_string(json_data).map_err(|e| {
102 error!(error = ?e, "Failed to serialize JSON");
103 DataflowError::Validation(format!("JSON serialization error: {}", e))
104 })?;
105
106 let mx_message = MxMessage::from_json(&json_str).map_err(|e| {
108 error!(error = ?e, "Failed to deserialize MX message");
109 DataflowError::Validation(format!("MX deserialization error: {}", e))
110 })?;
111
112 mx_message.to_xml().map_err(|e| {
113 error!(error = ?e, "Failed to serialize to XML");
114 DataflowError::Validation(format!("XML serialization error: {}", e))
115 })
116 }
117}