swift_mt_message/plugin/
generate.rs1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18 #[instrument(skip(self, message, config, _datalogic))]
19 async fn execute(
20 &self,
21 message: &mut Message,
22 config: &FunctionConfig,
23 _datalogic: Arc<DataLogic>,
24 ) -> Result<(usize, Vec<Change>)> {
25 debug!("Starting datafake generation");
26
27 let input = match config {
29 FunctionConfig::Custom { input, name: _ } => input,
30 _ => {
31 return Err(DataflowError::Validation(
32 "Invalid configuration type".to_string(),
33 ));
34 }
35 };
36
37 let schema_field = input.get("schema").and_then(Value::as_str).ok_or_else(|| {
39 DataflowError::Validation("'schema' parameter is required".to_string())
40 })?;
41
42 let generated_field = input
43 .get("generated")
44 .and_then(Value::as_str)
45 .ok_or_else(|| {
46 DataflowError::Validation("'generated' parameter is required".to_string())
47 })?;
48
49 let schema = message.data().get(schema_field).cloned().ok_or_else(|| {
51 DataflowError::Validation(format!(
52 "Schema field '{}' not found in message data",
53 schema_field
54 ))
55 })?;
56
57 debug!(
58 schema_field = %schema_field,
59 generated_field = %generated_field,
60 "Generating data using datafake"
61 );
62
63 let generated_data = match DataGenerator::from_value(schema.clone()) {
65 Ok(generator) => generator.generate().map_err(|e| {
66 error!(error = ?e, "Datafake generation failed");
67 DataflowError::Validation(format!("Datafake generation failed: {}", e))
68 })?,
69 Err(e) => {
70 error!(error = ?e, "Failed to create datafake generator from schema");
71 return Err(DataflowError::Validation(format!(
72 "Invalid datafake schema: {}",
73 e
74 )));
75 }
76 };
77
78 let old_value = message
80 .data()
81 .get(generated_field)
82 .cloned()
83 .unwrap_or(Value::Null);
84
85 message
86 .data_mut()
87 .as_object_mut()
88 .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
89 .insert(generated_field.to_string(), generated_data.clone());
90
91 message.invalidate_context_cache();
93
94 debug!("Successfully generated data");
95
96 Ok((
97 200,
98 vec![Change {
99 path: Arc::from(format!("data.{}", generated_field)),
100 old_value: Arc::new(old_value),
101 new_value: Arc::new(generated_data),
102 }],
103 ))
104 }
105}