datasynth_core/plugins/
csv_echo.rs1use crate::error::SynthError;
7use crate::traits::plugin::{GeneratedRecord, SinkPlugin, SinkSummary};
8
9pub struct CsvEchoSink {
20 delimiter: char,
22 buffer: Vec<String>,
24 record_count: usize,
26}
27
28impl Default for CsvEchoSink {
29 fn default() -> Self {
30 Self {
31 delimiter: ',',
32 buffer: Vec::new(),
33 record_count: 0,
34 }
35 }
36}
37
38impl CsvEchoSink {
39 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn lines(&self) -> &[String] {
46 &self.buffer
47 }
48
49 pub fn record_count(&self) -> usize {
51 self.record_count
52 }
53}
54
55impl SinkPlugin for CsvEchoSink {
56 fn name(&self) -> &str {
57 "csv_echo"
58 }
59
60 fn initialize(&mut self, config: &serde_json::Value) -> Result<(), SynthError> {
61 if let Some(delim) = config.get("delimiter").and_then(|v| v.as_str()) {
62 let mut chars = delim.chars();
63 self.delimiter = chars
64 .next()
65 .ok_or_else(|| SynthError::generation("delimiter must be a non-empty string"))?;
66 }
67 self.buffer.clear();
68 self.record_count = 0;
69 Ok(())
70 }
71
72 fn write_records(&mut self, records: &[GeneratedRecord]) -> Result<usize, SynthError> {
73 for record in records {
74 let mut keys: Vec<&String> = record.fields.keys().collect();
76 keys.sort();
77
78 let values: Vec<String> = keys
79 .iter()
80 .map(|k| {
81 record
82 .fields
83 .get(*k)
84 .map(|v| match v {
85 serde_json::Value::String(s) => s.clone(),
86 other => other.to_string(),
87 })
88 .unwrap_or_default()
89 })
90 .collect();
91
92 let line = values.join(&self.delimiter.to_string());
93 self.buffer.push(line);
94 }
95 self.record_count += records.len();
96 Ok(records.len())
97 }
98
99 fn finalize(&mut self) -> Result<SinkSummary, SynthError> {
100 let total_bytes: usize = self.buffer.iter().map(|l| l.len()).sum();
101 let mut summary = SinkSummary::new(self.record_count);
102 summary.bytes_written = Some(total_bytes as u64);
103 summary
104 .metadata
105 .insert("delimiter".to_string(), self.delimiter.to_string());
106 Ok(summary)
107 }
108}
109
110#[cfg(test)]
111#[allow(clippy::unwrap_used)]
112mod tests {
113 use super::*;
114
115 fn sample_records() -> Vec<GeneratedRecord> {
116 vec![
117 GeneratedRecord::new("vendor")
118 .with_field("id", serde_json::json!("V001"))
119 .with_field("name", serde_json::json!("Acme Corp")),
120 GeneratedRecord::new("vendor")
121 .with_field("id", serde_json::json!("V002"))
122 .with_field("name", serde_json::json!("Globex Inc")),
123 ]
124 }
125
126 #[test]
127 fn test_csv_echo_default_delimiter() {
128 let mut sink = CsvEchoSink::new();
129 sink.initialize(&serde_json::json!({}))
130 .expect("init should succeed");
131
132 let written = sink
133 .write_records(&sample_records())
134 .expect("write should succeed");
135 assert_eq!(written, 2);
136
137 let lines = sink.lines();
138 assert_eq!(lines.len(), 2);
139 assert_eq!(lines[0], "V001,Acme Corp");
141 assert_eq!(lines[1], "V002,Globex Inc");
142
143 let summary = sink.finalize().expect("finalize should succeed");
144 assert_eq!(summary.records_written, 2);
145 assert!(summary.bytes_written.is_some());
146 }
147
148 #[test]
149 fn test_csv_echo_custom_delimiter() {
150 let mut sink = CsvEchoSink::new();
151 sink.initialize(&serde_json::json!({ "delimiter": ";" }))
152 .expect("init should succeed");
153
154 sink.write_records(&sample_records())
155 .expect("write should succeed");
156
157 let lines = sink.lines();
158 assert_eq!(lines[0], "V001;Acme Corp");
159 }
160
161 #[test]
162 fn test_csv_echo_empty_records() {
163 let mut sink = CsvEchoSink::new();
164 sink.initialize(&serde_json::json!({}))
165 .expect("init should succeed");
166
167 let written = sink.write_records(&[]).expect("write should succeed");
168 assert_eq!(written, 0);
169 assert_eq!(sink.record_count(), 0);
170 }
171
172 #[test]
173 fn test_csv_echo_multiple_batches() {
174 let mut sink = CsvEchoSink::new();
175 sink.initialize(&serde_json::json!({}))
176 .expect("init should succeed");
177
178 sink.write_records(&sample_records()).expect("first batch");
179 sink.write_records(&sample_records()).expect("second batch");
180
181 assert_eq!(sink.record_count(), 4);
182 assert_eq!(sink.lines().len(), 4);
183 }
184}