swift_mt_message/plugin/
generate.rs1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18 #[instrument(skip(self, message, config, _datalogic))]
19 async fn execute(
20 &self,
21 message: &mut Message,
22 config: &FunctionConfig,
23 _datalogic: Arc<DataLogic>,
24 ) -> Result<(usize, Vec<Change>)> {
25 debug!("Starting datafake generation");
26
27 let input = match config {
29 FunctionConfig::Custom { input, name: _ } => input,
30 _ => {
31 return Err(DataflowError::Validation(
32 "Invalid configuration type".to_string(),
33 ));
34 }
35 };
36
37 let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
39 DataflowError::Validation("'target' parameter is required".to_string())
40 })?;
41
42 let scenario = (*message.payload).clone();
44
45 debug!(
46 target_field = %target_field,
47 "Generating data using datafake"
48 );
49
50 let generated_data = match DataGenerator::from_value(scenario) {
52 Ok(generator) => generator.generate().map_err(|e| {
53 error!(error = ?e, "Datafake generation failed");
54 DataflowError::Validation(format!("Datafake generation failed: {}", e))
55 })?,
56 Err(e) => {
57 error!(error = ?e, "Failed to create datafake generator from scenario");
58 return Err(DataflowError::Validation(format!(
59 "Invalid datafake scenario: {}",
60 e
61 )));
62 }
63 };
64
65 let old_value = message
67 .data()
68 .get(target_field)
69 .cloned()
70 .unwrap_or(Value::Null);
71
72 message
73 .data_mut()
74 .as_object_mut()
75 .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
76 .insert(target_field.to_string(), generated_data.clone());
77
78 message.invalidate_context_cache();
80
81 debug!("Successfully generated data");
82
83 Ok((
84 200,
85 vec![Change {
86 path: Arc::from(format!("data.{}", target_field)),
87 old_value: Arc::new(old_value),
88 new_value: Arc::new(generated_data),
89 }],
90 ))
91 }
92}