mx_message/plugin/
generate.rs

1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18    #[instrument(skip(self, message, config, _datalogic))]
19    async fn execute(
20        &self,
21        message: &mut Message,
22        config: &FunctionConfig,
23        _datalogic: Arc<DataLogic>,
24    ) -> Result<(usize, Vec<Change>)> {
25        debug!("Starting datafake generation for MX message");
26
27        // Extract configuration
28        let input = match config {
29            FunctionConfig::Custom { input, name: _ } => input,
30            _ => {
31                return Err(DataflowError::Validation(
32                    "Invalid configuration type".to_string(),
33                ));
34            }
35        };
36
37        // Get the output field name for generated data
38        let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
39            DataflowError::Validation("'target' parameter is required".to_string())
40        })?;
41
42        // Get the datafake scenario from payload
43        let scenario = (*message.payload).clone();
44
45        // Generate data using datafake
46        let generated_data = match DataGenerator::from_value(scenario) {
47            Ok(generator) => generator.generate().map_err(|e| {
48                error!(error = ?e, "Datafake generation failed");
49                DataflowError::Validation(format!("Datafake generation failed: {}", e))
50            })?,
51            Err(e) => {
52                error!(error = ?e, "Failed to create datafake generator from scenario");
53                return Err(DataflowError::Validation(format!(
54                    "Invalid datafake scenario: {}",
55                    e
56                )));
57            }
58        };
59
60        // Store the generated data in the target field
61        let old_value = message
62            .data()
63            .get(target_field)
64            .cloned()
65            .unwrap_or(Value::Null);
66
67        message
68            .data_mut()
69            .as_object_mut()
70            .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
71            .insert(target_field.to_string(), generated_data.clone());
72
73        // Invalidate cache after modification
74        message.invalidate_context_cache();
75
76        Ok((
77            200,
78            vec![Change {
79                path: Arc::from(format!("data.{}", target_field)),
80                old_value: Arc::new(old_value),
81                new_value: Arc::new(generated_data),
82            }],
83        ))
84    }
85}