mx_message/plugin/
publish.rs

1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4    AsyncFunctionHandler, FunctionConfig,
5    error::Result,
6    message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::Value;
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13pub struct Publish;
14
15#[async_trait]
16impl AsyncFunctionHandler for Publish {
17    #[instrument(skip(self, message, config, _datalogic))]
18    async fn execute(
19        &self,
20        message: &mut Message,
21        config: &FunctionConfig,
22        _datalogic: Arc<DataLogic>,
23    ) -> Result<(usize, Vec<Change>)> {
24        debug!("Starting JSON to MX message publishing");
25
26        // Extract custom configuration
27        let input = match config {
28            FunctionConfig::Custom { input, .. } => input,
29            _ => {
30                return Err(DataflowError::Validation(
31                    "Invalid configuration type".to_string(),
32                ));
33            }
34        };
35
36        // Get source and target field names
37        let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
38            DataflowError::Validation("'source' parameter is required".to_string())
39        })?;
40
41        let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
42            DataflowError::Validation("'target' parameter is required".to_string())
43        })?;
44
45        // Extract JSON data from the message
46        let json_data = message.data().get(source_field).cloned().ok_or_else(|| {
47            error!(
48                source_field = %source_field,
49                available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
50                "JSON data field not found in message data"
51            );
52            DataflowError::Validation(format!(
53                "Field '{}' not found in message data",
54                source_field
55            ))
56        })?;
57
58        debug!(
59            source_field = %source_field,
60            target_field = %target_field,
61            "Processing JSON to MX conversion"
62        );
63
64        // Convert JSON to MX message (message type is auto-detected from AppHdr.MsgDefIdr)
65        let mx_message = self.json_to_mx(&json_data)?;
66
67        debug!(
68            message_length = mx_message.len(),
69            "MX message published successfully"
70        );
71
72        // Store the MX message in the output field
73        let old_value = message
74            .data()
75            .get(target_field)
76            .cloned()
77            .unwrap_or(Value::Null);
78
79        message.data_mut()[target_field] = Value::String(mx_message.clone());
80
81        // Invalidate cache after modifications
82        message.invalidate_context_cache();
83
84        Ok((
85            200,
86            vec![Change {
87                path: Arc::from(format!("data.{}", target_field)),
88                old_value: Arc::new(old_value),
89                new_value: Arc::new(Value::String(mx_message)),
90            }],
91        ))
92    }
93}
94
95impl Publish {
96    /// Convert JSON to MX message using the new MxMessage API
97    fn json_to_mx(&self, json_data: &Value) -> Result<String> {
98        use crate::mx_envelope::MxMessage;
99
100        // Serialize JSON to string for parsing
101        let json_str = serde_json::to_string(json_data).map_err(|e| {
102            error!(error = ?e, "Failed to serialize JSON");
103            DataflowError::Validation(format!("JSON serialization error: {}", e))
104        })?;
105
106        // Use MxMessage to deserialize and re-serialize to XML
107        let mx_message = MxMessage::from_json(&json_str).map_err(|e| {
108            error!(error = ?e, "Failed to deserialize MX message");
109            DataflowError::Validation(format!("MX deserialization error: {}", e))
110        })?;
111
112        mx_message.to_xml().map_err(|e| {
113            error!(error = ?e, "Failed to serialize to XML");
114            DataflowError::Validation(format!("XML serialization error: {}", e))
115        })
116    }
117}