mx_message/plugin/
generate.rs

1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18    #[instrument(skip(self, message, config, _datalogic))]
19    async fn execute(
20        &self,
21        message: &mut Message,
22        config: &FunctionConfig,
23        _datalogic: Arc<DataLogic>,
24    ) -> Result<(usize, Vec<Change>)> {
25        eprintln!("🔍 GENERATE PLUGIN: Starting datafake generation for MX message");
26        debug!("Starting datafake generation for MX message");
27
28        // Extract configuration
29        let input = match config {
30            FunctionConfig::Custom { input, name: _ } => input,
31            _ => {
32                return Err(DataflowError::Validation(
33                    "Invalid configuration type".to_string(),
34                ));
35            }
36        };
37
38        // Get the output field name for generated data
39        let generated_field = input
40            .get("generated")
41            .and_then(Value::as_str)
42            .ok_or_else(|| {
43                DataflowError::Validation("'generated' parameter is required".to_string())
44            })?;
45
46        let message_type = input
47            .get("message_type")
48            .and_then(Value::as_str)
49            .ok_or_else(|| {
50                DataflowError::Validation("'message_type' parameter is required".to_string())
51            })?;
52
53        // Get the datafake scenario from payload (contains both variables and schema)
54        let scenario = message.payload.clone();
55
56        debug!(
57            generated_field = %generated_field,
58            message_type = %message_type,
59            "Generating MX message data from payload using datafake"
60        );
61
62        // Extract variables and schema from scenario
63        let variables = scenario.get("variables").ok_or_else(|| {
64            DataflowError::Validation("Scenario missing 'variables' section in payload".to_string())
65        })?;
66
67        let schema = scenario.get("schema").ok_or_else(|| {
68            DataflowError::Validation("Scenario missing 'schema' section in payload".to_string())
69        })?;
70
71        // Create datafake config with variables and schema
72        let datafake_config = serde_json::json!({
73            "variables": variables,
74            "schema": schema
75        });
76
77        // Generate data using datafake
78        let generated_data = match DataGenerator::from_value(datafake_config) {
79            Ok(generator) => generator.generate().map_err(|e| {
80                error!(error = ?e, "Datafake generation failed");
81                DataflowError::Validation(format!("Datafake generation failed: {}", e))
82            })?,
83            Err(e) => {
84                error!(error = ?e, "Failed to create datafake generator from scenario");
85                return Err(DataflowError::Validation(format!(
86                    "Invalid datafake scenario: {}",
87                    e
88                )));
89            }
90        };
91
92        // Wrap the generated data with message_type and json_data structure
93        // This matches the expected structure for the publish plugin
94        let wrapped_data = serde_json::json!({
95            "message_type": message_type,
96            "json_data": generated_data.clone()
97        });
98
99        // Debug: Log a sample value to trace data flow
100        if let Some(cdtr_nm) = generated_data
101            .pointer("/AppHdr/BizMsgIdr")
102            .or_else(|| generated_data.pointer("/Document/FIToFICstmrCdtTrf/GrpHdr/MsgId"))
103        {
104            eprintln!("🔍 GENERATE: Generated data MsgId/BizMsgIdr: {}", cdtr_nm);
105        }
106
107        // Store the generated data in the generated field
108        let old_value = message
109            .data()
110            .get(generated_field)
111            .cloned()
112            .unwrap_or(Value::Null);
113
114        message
115            .data_mut()
116            .as_object_mut()
117            .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
118            .insert(generated_field.to_string(), wrapped_data.clone());
119
120        // Invalidate cache after modification
121        message.invalidate_context_cache();
122
123        // Debug: Verify the data was stored
124        if let Some(stored) = message.data().get(generated_field) {
125            if let Some(stored_msg_id) = stored.pointer("/json_data/AppHdr/BizMsgIdr") {
126                eprintln!("🔍 GENERATE: Verified stored MsgId: {}", stored_msg_id);
127            }
128        } else {
129            eprintln!(
130                "❌ GENERATE: Failed to store data in field '{}'",
131                generated_field
132            );
133        }
134
135        debug!(
136            message_type = %message_type,
137            "Successfully generated MX message data"
138        );
139
140        Ok((
141            200,
142            vec![Change {
143                path: Arc::from(format!("data.{}", generated_field)),
144                old_value: Arc::new(old_value),
145                new_value: Arc::new(wrapped_data),
146            }],
147        ))
148    }
149}