swift_mt_message/plugin/
generate.rs

1use async_trait::async_trait;
2use datafake_rs::DataGenerator;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Generate;
15
16#[async_trait]
17impl AsyncFunctionHandler for Generate {
18    #[instrument(skip(self, message, config, _datalogic))]
19    async fn execute(
20        &self,
21        message: &mut Message,
22        config: &FunctionConfig,
23        _datalogic: Arc<DataLogic>,
24    ) -> Result<(usize, Vec<Change>)> {
25        debug!("Starting datafake generation");
26
27        // Extract configuration
28        let input = match config {
29            FunctionConfig::Custom { input, name: _ } => input,
30            _ => {
31                return Err(DataflowError::Validation(
32                    "Invalid configuration type".to_string(),
33                ));
34            }
35        };
36
37        // Get schema and generated field names
38        let schema_field = input.get("schema").and_then(Value::as_str).ok_or_else(|| {
39            DataflowError::Validation("'schema' parameter is required".to_string())
40        })?;
41
42        let generated_field = input
43            .get("generated")
44            .and_then(Value::as_str)
45            .ok_or_else(|| {
46                DataflowError::Validation("'generated' parameter is required".to_string())
47            })?;
48
49        // Get the datafake schema from the schema field
50        let schema = message.data().get(schema_field).cloned().ok_or_else(|| {
51            DataflowError::Validation(format!(
52                "Schema field '{}' not found in message data",
53                schema_field
54            ))
55        })?;
56
57        debug!(
58            schema_field = %schema_field,
59            generated_field = %generated_field,
60            "Generating data using datafake"
61        );
62
63        // Generate data using datafake
64        let generated_data = match DataGenerator::from_value(schema.clone()) {
65            Ok(generator) => generator.generate().map_err(|e| {
66                error!(error = ?e, "Datafake generation failed");
67                DataflowError::Validation(format!("Datafake generation failed: {}", e))
68            })?,
69            Err(e) => {
70                error!(error = ?e, "Failed to create datafake generator from schema");
71                return Err(DataflowError::Validation(format!(
72                    "Invalid datafake schema: {}",
73                    e
74                )));
75            }
76        };
77
78        // Store the generated data in the generated field
79        let old_value = message
80            .data()
81            .get(generated_field)
82            .cloned()
83            .unwrap_or(Value::Null);
84
85        message
86            .data_mut()
87            .as_object_mut()
88            .ok_or_else(|| DataflowError::Validation("Message data must be an object".to_string()))?
89            .insert(generated_field.to_string(), generated_data.clone());
90
91        // Invalidate cache after modification
92        message.invalidate_context_cache();
93
94        debug!("Successfully generated data");
95
96        Ok((
97            200,
98            vec![Change {
99                path: Arc::from(format!("data.{}", generated_field)),
100                old_value: Arc::new(old_value),
101                new_value: Arc::new(generated_data),
102            }],
103        ))
104    }
105}