swift_mt_message/plugin/
generate.rs

1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18    #[instrument(skip(self, message, config, _datalogic))]
19    async fn execute(
20        &self,
21        message: &mut Message,
22        config: &FunctionConfig,
23        _datalogic: Arc<DataLogic>,
24    ) -> Result<(usize, Vec<Change>)> {
25        debug!("Starting datafake generation");
26
27        // Extract configuration
28        let input = match config {
29            FunctionConfig::Custom { input, name: _ } => input,
30            _ => {
31                return Err(DataflowError::Validation(
32                    "Invalid configuration type".to_string(),
33                ));
34            }
35        };
36
37        // Get the output field name for generated data
38        let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
39            DataflowError::Validation("'target' parameter is required".to_string())
40        })?;
41
42        // Get the datafake scenario from payload
43        let scenario = (*message.payload).clone();
44
45        debug!(
46            target_field = %target_field,
47            "Generating data using datafake"
48        );
49
50        // Generate data using datafake
51        let generated_data = match DataGenerator::from_value(scenario) {
52            Ok(generator) => generator.generate().map_err(|e| {
53                error!(error = ?e, "Datafake generation failed");
54                DataflowError::Validation(format!("Datafake generation failed: {}", e))
55            })?,
56            Err(e) => {
57                error!(error = ?e, "Failed to create datafake generator from scenario");
58                return Err(DataflowError::Validation(format!(
59                    "Invalid datafake scenario: {}",
60                    e
61                )));
62            }
63        };
64
65        // Store the generated data in the target field
66        let old_value = message
67            .data()
68            .get(target_field)
69            .cloned()
70            .unwrap_or(Value::Null);
71
72        message
73            .data_mut()
74            .as_object_mut()
75            .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
76            .insert(target_field.to_string(), generated_data.clone());
77
78        // Invalidate cache after modification
79        message.invalidate_context_cache();
80
81        debug!("Successfully generated data");
82
83        Ok((
84            200,
85            vec![Change {
86                path: Arc::from(format!("data.{}", target_field)),
87                old_value: Arc::new(old_value),
88                new_value: Arc::new(generated_data),
89            }],
90        ))
91    }
92}