Skip to main content

datasynth_core/plugins/
timestamp_enricher.rs

1//! Example `TransformPlugin` that enriches records with generation metadata.
2//!
3//! Adds a UTC timestamp and plugin version to every record that passes through.
4
5use crate::error::SynthError;
6use crate::traits::plugin::{GeneratedRecord, TransformPlugin};
7use chrono::Utc;
8
9/// A transform plugin that adds `_generated_at` (ISO 8601 UTC) and
10/// `_plugin_version` fields to each record.
11pub struct TimestampEnricher;
12
13impl TimestampEnricher {
14    /// Create a new `TimestampEnricher`.
15    pub fn new() -> Self {
16        Self
17    }
18}
19
20impl Default for TimestampEnricher {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl TransformPlugin for TimestampEnricher {
27    fn name(&self) -> &str {
28        "timestamp_enricher"
29    }
30
31    fn transform(&self, records: Vec<GeneratedRecord>) -> Result<Vec<GeneratedRecord>, SynthError> {
32        let now = Utc::now().to_rfc3339();
33        let version = serde_json::Value::String("1.0.0".to_string());
34
35        let enriched = records
36            .into_iter()
37            .map(|mut record| {
38                record
39                    .fields
40                    .insert("_generated_at".to_string(), serde_json::json!(now));
41                record
42                    .fields
43                    .insert("_plugin_version".to_string(), version.clone());
44                record
45            })
46            .collect();
47
48        Ok(enriched)
49    }
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55
56    #[test]
57    fn test_timestamp_enricher_adds_fields() {
58        let enricher = TimestampEnricher::new();
59
60        let records = vec![
61            GeneratedRecord::new("invoice").with_field("id", serde_json::json!("INV001")),
62            GeneratedRecord::new("invoice").with_field("id", serde_json::json!("INV002")),
63        ];
64
65        let result = enricher
66            .transform(records)
67            .expect("transform should succeed");
68        assert_eq!(result.len(), 2);
69
70        for record in &result {
71            assert!(
72                record.fields.contains_key("_generated_at"),
73                "should have _generated_at field"
74            );
75            assert_eq!(
76                record.get_str("_plugin_version"),
77                Some("1.0.0"),
78                "should have _plugin_version = 1.0.0"
79            );
80            // Original field should be preserved.
81            assert!(
82                record.get_str("id").is_some(),
83                "original id field should be preserved"
84            );
85        }
86    }
87
88    #[test]
89    fn test_timestamp_enricher_empty_input() {
90        let enricher = TimestampEnricher::new();
91        let result = enricher
92            .transform(vec![])
93            .expect("transform should succeed");
94        assert!(result.is_empty());
95    }
96
97    #[test]
98    fn test_timestamp_enricher_preserves_existing_fields() {
99        let enricher = TimestampEnricher::new();
100
101        let records = vec![GeneratedRecord::new("order")
102            .with_field("customer", serde_json::json!("CUST001"))
103            .with_field("amount", serde_json::json!(1500.0))];
104
105        let result = enricher
106            .transform(records)
107            .expect("transform should succeed");
108        let record = &result[0];
109
110        assert_eq!(record.get_str("customer"), Some("CUST001"));
111        assert_eq!(record.get("amount").and_then(|v| v.as_f64()), Some(1500.0));
112        assert!(record.fields.contains_key("_generated_at"));
113        assert_eq!(record.get_str("_plugin_version"), Some("1.0.0"));
114    }
115}