mx_message/plugin/
generate.rs1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18 #[instrument(skip(self, message, config, _datalogic))]
19 async fn execute(
20 &self,
21 message: &mut Message,
22 config: &FunctionConfig,
23 _datalogic: Arc<DataLogic>,
24 ) -> Result<(usize, Vec<Change>)> {
25 eprintln!("🔍 GENERATE PLUGIN: Starting datafake generation for MX message");
26 debug!("Starting datafake generation for MX message");
27
28 let input = match config {
30 FunctionConfig::Custom { input, name: _ } => input,
31 _ => {
32 return Err(DataflowError::Validation(
33 "Invalid configuration type".to_string(),
34 ));
35 }
36 };
37
38 let generated_field = input
40 .get("generated")
41 .and_then(Value::as_str)
42 .ok_or_else(|| {
43 DataflowError::Validation("'generated' parameter is required".to_string())
44 })?;
45
46 let message_type = input
47 .get("message_type")
48 .and_then(Value::as_str)
49 .ok_or_else(|| {
50 DataflowError::Validation("'message_type' parameter is required".to_string())
51 })?;
52
53 let scenario = message.payload.clone();
55
56 debug!(
57 generated_field = %generated_field,
58 message_type = %message_type,
59 "Generating MX message data from payload using datafake"
60 );
61
62 let variables = scenario.get("variables").ok_or_else(|| {
64 DataflowError::Validation("Scenario missing 'variables' section in payload".to_string())
65 })?;
66
67 let schema = scenario.get("schema").ok_or_else(|| {
68 DataflowError::Validation("Scenario missing 'schema' section in payload".to_string())
69 })?;
70
71 let datafake_config = serde_json::json!({
73 "variables": variables,
74 "schema": schema
75 });
76
77 let generated_data = match DataGenerator::from_value(datafake_config) {
79 Ok(generator) => generator.generate().map_err(|e| {
80 error!(error = ?e, "Datafake generation failed");
81 DataflowError::Validation(format!("Datafake generation failed: {}", e))
82 })?,
83 Err(e) => {
84 error!(error = ?e, "Failed to create datafake generator from scenario");
85 return Err(DataflowError::Validation(format!(
86 "Invalid datafake scenario: {}",
87 e
88 )));
89 }
90 };
91
92 let wrapped_data = serde_json::json!({
95 "message_type": message_type,
96 "json_data": generated_data.clone()
97 });
98
99 if let Some(cdtr_nm) = generated_data
101 .pointer("/AppHdr/BizMsgIdr")
102 .or_else(|| generated_data.pointer("/Document/FIToFICstmrCdtTrf/GrpHdr/MsgId"))
103 {
104 eprintln!("🔍 GENERATE: Generated data MsgId/BizMsgIdr: {}", cdtr_nm);
105 }
106
107 let old_value = message
109 .data()
110 .get(generated_field)
111 .cloned()
112 .unwrap_or(Value::Null);
113
114 message
115 .data_mut()
116 .as_object_mut()
117 .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
118 .insert(generated_field.to_string(), wrapped_data.clone());
119
120 message.invalidate_context_cache();
122
123 if let Some(stored) = message.data().get(generated_field) {
125 if let Some(stored_msg_id) = stored.pointer("/json_data/AppHdr/BizMsgIdr") {
126 eprintln!("🔍 GENERATE: Verified stored MsgId: {}", stored_msg_id);
127 }
128 } else {
129 eprintln!(
130 "❌ GENERATE: Failed to store data in field '{}'",
131 generated_field
132 );
133 }
134
135 debug!(
136 message_type = %message_type,
137 "Successfully generated MX message data"
138 );
139
140 Ok((
141 200,
142 vec![Change {
143 path: Arc::from(format!("data.{}", generated_field)),
144 old_value: Arc::new(old_value),
145 new_value: Arc::new(wrapped_data),
146 }],
147 ))
148 }
149}