mx_message/plugin/
generate.rs1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18 #[instrument(skip(self, message, config, _datalogic))]
19 async fn execute(
20 &self,
21 message: &mut Message,
22 config: &FunctionConfig,
23 _datalogic: Arc<DataLogic>,
24 ) -> Result<(usize, Vec<Change>)> {
25 debug!("Starting datafake generation for MX message");
26
27 let input = match config {
29 FunctionConfig::Custom { input, name: _ } => input,
30 _ => {
31 return Err(DataflowError::Validation(
32 "Invalid configuration type".to_string(),
33 ));
34 }
35 };
36
37 let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
39 DataflowError::Validation("'target' parameter is required".to_string())
40 })?;
41
42 let scenario = (*message.payload).clone();
44
45 let generated_data = match DataGenerator::from_value(scenario) {
47 Ok(generator) => generator.generate().map_err(|e| {
48 error!(error = ?e, "Datafake generation failed");
49 DataflowError::Validation(format!("Datafake generation failed: {}", e))
50 })?,
51 Err(e) => {
52 error!(error = ?e, "Failed to create datafake generator from scenario");
53 return Err(DataflowError::Validation(format!(
54 "Invalid datafake scenario: {}",
55 e
56 )));
57 }
58 };
59
60 let old_value = message
62 .data()
63 .get(target_field)
64 .cloned()
65 .unwrap_or(Value::Null);
66
67 message
68 .data_mut()
69 .as_object_mut()
70 .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
71 .insert(target_field.to_string(), generated_data.clone());
72
73 message.invalidate_context_cache();
75
76 Ok((
77 200,
78 vec![Change {
79 path: Arc::from(format!("data.{}", target_field)),
80 old_value: Arc::new(old_value),
81 new_value: Arc::new(generated_data),
82 }],
83 ))
84 }
85}