Skip to main content

datasynth_core/plugins/
csv_echo.rs

1//! Example `SinkPlugin` that writes `GeneratedRecord`s to an in-memory buffer
2//! as CSV-like lines.
3//!
4//! This plugin demonstrates the full sink lifecycle: initialize, write, finalize.
5
6use crate::error::SynthError;
7use crate::traits::plugin::{GeneratedRecord, SinkPlugin, SinkSummary};
8
9/// A sink plugin that formats records as CSV-like lines into a `String` buffer.
10///
11/// # Configuration
12///
13/// Accepts a JSON object with an optional `delimiter` field (single character,
14/// defaults to `,`).
15///
16/// ```json
17/// { "delimiter": ";" }
18/// ```
19pub struct CsvEchoSink {
20    /// Column delimiter character.
21    delimiter: char,
22    /// Accumulated output lines.
23    buffer: Vec<String>,
24    /// Total records written so far.
25    record_count: usize,
26}
27
28impl Default for CsvEchoSink {
29    fn default() -> Self {
30        Self {
31            delimiter: ',',
32            buffer: Vec::new(),
33            record_count: 0,
34        }
35    }
36}
37
38impl CsvEchoSink {
39    /// Create a new `CsvEchoSink` with default settings.
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Return a reference to the accumulated output lines.
45    pub fn lines(&self) -> &[String] {
46        &self.buffer
47    }
48
49    /// Return the total number of records written.
50    pub fn record_count(&self) -> usize {
51        self.record_count
52    }
53}
54
55impl SinkPlugin for CsvEchoSink {
56    fn name(&self) -> &str {
57        "csv_echo"
58    }
59
60    fn initialize(&mut self, config: &serde_json::Value) -> Result<(), SynthError> {
61        if let Some(delim) = config.get("delimiter").and_then(|v| v.as_str()) {
62            let mut chars = delim.chars();
63            self.delimiter = chars
64                .next()
65                .ok_or_else(|| SynthError::generation("delimiter must be a non-empty string"))?;
66        }
67        self.buffer.clear();
68        self.record_count = 0;
69        Ok(())
70    }
71
72    fn write_records(&mut self, records: &[GeneratedRecord]) -> Result<usize, SynthError> {
73        for record in records {
74            // Collect field keys in sorted order for deterministic output.
75            let mut keys: Vec<&String> = record.fields.keys().collect();
76            keys.sort();
77
78            let values: Vec<String> = keys
79                .iter()
80                .map(|k| {
81                    record
82                        .fields
83                        .get(*k)
84                        .map(|v| match v {
85                            serde_json::Value::String(s) => s.clone(),
86                            other => other.to_string(),
87                        })
88                        .unwrap_or_default()
89                })
90                .collect();
91
92            let line = values.join(&self.delimiter.to_string());
93            self.buffer.push(line);
94        }
95        self.record_count += records.len();
96        Ok(records.len())
97    }
98
99    fn finalize(&mut self) -> Result<SinkSummary, SynthError> {
100        let total_bytes: usize = self.buffer.iter().map(std::string::String::len).sum();
101        let mut summary = SinkSummary::new(self.record_count);
102        summary.bytes_written = Some(total_bytes as u64);
103        summary
104            .metadata
105            .insert("delimiter".to_string(), self.delimiter.to_string());
106        Ok(summary)
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    fn sample_records() -> Vec<GeneratedRecord> {
115        vec![
116            GeneratedRecord::new("vendor")
117                .with_field("id", serde_json::json!("V001"))
118                .with_field("name", serde_json::json!("Acme Corp")),
119            GeneratedRecord::new("vendor")
120                .with_field("id", serde_json::json!("V002"))
121                .with_field("name", serde_json::json!("Globex Inc")),
122        ]
123    }
124
125    #[test]
126    fn test_csv_echo_default_delimiter() {
127        let mut sink = CsvEchoSink::new();
128        sink.initialize(&serde_json::json!({}))
129            .expect("init should succeed");
130
131        let written = sink
132            .write_records(&sample_records())
133            .expect("write should succeed");
134        assert_eq!(written, 2);
135
136        let lines = sink.lines();
137        assert_eq!(lines.len(), 2);
138        // Keys sorted: "id", "name"
139        assert_eq!(lines[0], "V001,Acme Corp");
140        assert_eq!(lines[1], "V002,Globex Inc");
141
142        let summary = sink.finalize().expect("finalize should succeed");
143        assert_eq!(summary.records_written, 2);
144        assert!(summary.bytes_written.is_some());
145    }
146
147    #[test]
148    fn test_csv_echo_custom_delimiter() {
149        let mut sink = CsvEchoSink::new();
150        sink.initialize(&serde_json::json!({ "delimiter": ";" }))
151            .expect("init should succeed");
152
153        sink.write_records(&sample_records())
154            .expect("write should succeed");
155
156        let lines = sink.lines();
157        assert_eq!(lines[0], "V001;Acme Corp");
158    }
159
160    #[test]
161    fn test_csv_echo_empty_records() {
162        let mut sink = CsvEchoSink::new();
163        sink.initialize(&serde_json::json!({}))
164            .expect("init should succeed");
165
166        let written = sink.write_records(&[]).expect("write should succeed");
167        assert_eq!(written, 0);
168        assert_eq!(sink.record_count(), 0);
169    }
170
171    #[test]
172    fn test_csv_echo_multiple_batches() {
173        let mut sink = CsvEchoSink::new();
174        sink.initialize(&serde_json::json!({}))
175            .expect("init should succeed");
176
177        sink.write_records(&sample_records()).expect("first batch");
178        sink.write_records(&sample_records()).expect("second batch");
179
180        assert_eq!(sink.record_count(), 4);
181        assert_eq!(sink.lines().len(), 4);
182    }
183}