Skip to main content

datasynth_core/plugins/
timestamp_enricher.rs

1//! Example `TransformPlugin` that enriches records with generation metadata.
2//!
3//! Adds a UTC timestamp and plugin version to every record that passes through.
4
5use crate::error::SynthError;
6use crate::traits::plugin::{GeneratedRecord, TransformPlugin};
7use chrono::Utc;
8
9/// A transform plugin that adds `_generated_at` (ISO 8601 UTC) and
10/// `_plugin_version` fields to each record.
11pub struct TimestampEnricher;
12
13impl TimestampEnricher {
14    /// Create a new `TimestampEnricher`.
15    pub fn new() -> Self {
16        Self
17    }
18}
19
20impl Default for TimestampEnricher {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl TransformPlugin for TimestampEnricher {
27    fn name(&self) -> &str {
28        "timestamp_enricher"
29    }
30
31    fn transform(&self, records: Vec<GeneratedRecord>) -> Result<Vec<GeneratedRecord>, SynthError> {
32        let now = Utc::now().to_rfc3339();
33        let version = serde_json::Value::String("1.0.0".to_string());
34
35        let enriched = records
36            .into_iter()
37            .map(|mut record| {
38                record
39                    .fields
40                    .insert("_generated_at".to_string(), serde_json::json!(now));
41                record
42                    .fields
43                    .insert("_plugin_version".to_string(), version.clone());
44                record
45            })
46            .collect();
47
48        Ok(enriched)
49    }
50}
51
52#[cfg(test)]
53#[allow(clippy::unwrap_used)]
54mod tests {
55    use super::*;
56
57    #[test]
58    fn test_timestamp_enricher_adds_fields() {
59        let enricher = TimestampEnricher::new();
60
61        let records = vec![
62            GeneratedRecord::new("invoice").with_field("id", serde_json::json!("INV001")),
63            GeneratedRecord::new("invoice").with_field("id", serde_json::json!("INV002")),
64        ];
65
66        let result = enricher
67            .transform(records)
68            .expect("transform should succeed");
69        assert_eq!(result.len(), 2);
70
71        for record in &result {
72            assert!(
73                record.fields.contains_key("_generated_at"),
74                "should have _generated_at field"
75            );
76            assert_eq!(
77                record.get_str("_plugin_version"),
78                Some("1.0.0"),
79                "should have _plugin_version = 1.0.0"
80            );
81            // Original field should be preserved.
82            assert!(
83                record.get_str("id").is_some(),
84                "original id field should be preserved"
85            );
86        }
87    }
88
89    #[test]
90    fn test_timestamp_enricher_empty_input() {
91        let enricher = TimestampEnricher::new();
92        let result = enricher
93            .transform(vec![])
94            .expect("transform should succeed");
95        assert!(result.is_empty());
96    }
97
98    #[test]
99    fn test_timestamp_enricher_preserves_existing_fields() {
100        let enricher = TimestampEnricher::new();
101
102        let records = vec![GeneratedRecord::new("order")
103            .with_field("customer", serde_json::json!("CUST001"))
104            .with_field("amount", serde_json::json!(1500.0))];
105
106        let result = enricher
107            .transform(records)
108            .expect("transform should succeed");
109        let record = &result[0];
110
111        assert_eq!(record.get_str("customer"), Some("CUST001"));
112        assert_eq!(record.get("amount").and_then(|v| v.as_f64()), Some(1500.0));
113        assert!(record.fields.contains_key("_generated_at"));
114        assert_eq!(record.get_str("_plugin_version"), Some("1.0.0"));
115    }
116}