Skip to main content

datasynth_core/plugins/
csv_echo.rs

1//! Example `SinkPlugin` that writes `GeneratedRecord`s to an in-memory buffer
2//! as CSV-like lines.
3//!
4//! This plugin demonstrates the full sink lifecycle: initialize, write, finalize.
5
6use crate::error::SynthError;
7use crate::traits::plugin::{GeneratedRecord, SinkPlugin, SinkSummary};
8
9/// A sink plugin that formats records as CSV-like lines into a `String` buffer.
10///
11/// # Configuration
12///
13/// Accepts a JSON object with an optional `delimiter` field (single character,
14/// defaults to `,`).
15///
16/// ```json
17/// { "delimiter": ";" }
18/// ```
19pub struct CsvEchoSink {
20    /// Column delimiter character.
21    delimiter: char,
22    /// Accumulated output lines.
23    buffer: Vec<String>,
24    /// Total records written so far.
25    record_count: usize,
26}
27
28impl Default for CsvEchoSink {
29    fn default() -> Self {
30        Self {
31            delimiter: ',',
32            buffer: Vec::new(),
33            record_count: 0,
34        }
35    }
36}
37
38impl CsvEchoSink {
39    /// Create a new `CsvEchoSink` with default settings.
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Return a reference to the accumulated output lines.
45    pub fn lines(&self) -> &[String] {
46        &self.buffer
47    }
48
49    /// Return the total number of records written.
50    pub fn record_count(&self) -> usize {
51        self.record_count
52    }
53}
54
55impl SinkPlugin for CsvEchoSink {
56    fn name(&self) -> &str {
57        "csv_echo"
58    }
59
60    fn initialize(&mut self, config: &serde_json::Value) -> Result<(), SynthError> {
61        if let Some(delim) = config.get("delimiter").and_then(|v| v.as_str()) {
62            let mut chars = delim.chars();
63            self.delimiter = chars
64                .next()
65                .ok_or_else(|| SynthError::generation("delimiter must be a non-empty string"))?;
66        }
67        self.buffer.clear();
68        self.record_count = 0;
69        Ok(())
70    }
71
72    fn write_records(&mut self, records: &[GeneratedRecord]) -> Result<usize, SynthError> {
73        for record in records {
74            // Collect field keys in sorted order for deterministic output.
75            let mut keys: Vec<&String> = record.fields.keys().collect();
76            keys.sort();
77
78            let values: Vec<String> = keys
79                .iter()
80                .map(|k| {
81                    record
82                        .fields
83                        .get(*k)
84                        .map(|v| match v {
85                            serde_json::Value::String(s) => s.clone(),
86                            other => other.to_string(),
87                        })
88                        .unwrap_or_default()
89                })
90                .collect();
91
92            let line = values.join(&self.delimiter.to_string());
93            self.buffer.push(line);
94        }
95        self.record_count += records.len();
96        Ok(records.len())
97    }
98
99    fn finalize(&mut self) -> Result<SinkSummary, SynthError> {
100        let total_bytes: usize = self.buffer.iter().map(|l| l.len()).sum();
101        let mut summary = SinkSummary::new(self.record_count);
102        summary.bytes_written = Some(total_bytes as u64);
103        summary
104            .metadata
105            .insert("delimiter".to_string(), self.delimiter.to_string());
106        Ok(summary)
107    }
108}
109
110#[cfg(test)]
111#[allow(clippy::unwrap_used)]
112mod tests {
113    use super::*;
114
115    fn sample_records() -> Vec<GeneratedRecord> {
116        vec![
117            GeneratedRecord::new("vendor")
118                .with_field("id", serde_json::json!("V001"))
119                .with_field("name", serde_json::json!("Acme Corp")),
120            GeneratedRecord::new("vendor")
121                .with_field("id", serde_json::json!("V002"))
122                .with_field("name", serde_json::json!("Globex Inc")),
123        ]
124    }
125
126    #[test]
127    fn test_csv_echo_default_delimiter() {
128        let mut sink = CsvEchoSink::new();
129        sink.initialize(&serde_json::json!({}))
130            .expect("init should succeed");
131
132        let written = sink
133            .write_records(&sample_records())
134            .expect("write should succeed");
135        assert_eq!(written, 2);
136
137        let lines = sink.lines();
138        assert_eq!(lines.len(), 2);
139        // Keys sorted: "id", "name"
140        assert_eq!(lines[0], "V001,Acme Corp");
141        assert_eq!(lines[1], "V002,Globex Inc");
142
143        let summary = sink.finalize().expect("finalize should succeed");
144        assert_eq!(summary.records_written, 2);
145        assert!(summary.bytes_written.is_some());
146    }
147
148    #[test]
149    fn test_csv_echo_custom_delimiter() {
150        let mut sink = CsvEchoSink::new();
151        sink.initialize(&serde_json::json!({ "delimiter": ";" }))
152            .expect("init should succeed");
153
154        sink.write_records(&sample_records())
155            .expect("write should succeed");
156
157        let lines = sink.lines();
158        assert_eq!(lines[0], "V001;Acme Corp");
159    }
160
161    #[test]
162    fn test_csv_echo_empty_records() {
163        let mut sink = CsvEchoSink::new();
164        sink.initialize(&serde_json::json!({}))
165            .expect("init should succeed");
166
167        let written = sink.write_records(&[]).expect("write should succeed");
168        assert_eq!(written, 0);
169        assert_eq!(sink.record_count(), 0);
170    }
171
172    #[test]
173    fn test_csv_echo_multiple_batches() {
174        let mut sink = CsvEchoSink::new();
175        sink.initialize(&serde_json::json!({}))
176            .expect("init should succeed");
177
178        sink.write_records(&sample_records()).expect("first batch");
179        sink.write_records(&sample_records()).expect("second batch");
180
181        assert_eq!(sink.record_count(), 4);
182        assert_eq!(sink.lines().len(), 4);
183    }
184}