datasynth_core/plugins/
csv_echo.rs1use crate::error::SynthError;
7use crate::traits::plugin::{GeneratedRecord, SinkPlugin, SinkSummary};
8
9pub struct CsvEchoSink {
20 delimiter: char,
22 buffer: Vec<String>,
24 record_count: usize,
26}
27
28impl Default for CsvEchoSink {
29 fn default() -> Self {
30 Self {
31 delimiter: ',',
32 buffer: Vec::new(),
33 record_count: 0,
34 }
35 }
36}
37
38impl CsvEchoSink {
39 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn lines(&self) -> &[String] {
46 &self.buffer
47 }
48
49 pub fn record_count(&self) -> usize {
51 self.record_count
52 }
53}
54
55impl SinkPlugin for CsvEchoSink {
56 fn name(&self) -> &str {
57 "csv_echo"
58 }
59
60 fn initialize(&mut self, config: &serde_json::Value) -> Result<(), SynthError> {
61 if let Some(delim) = config.get("delimiter").and_then(|v| v.as_str()) {
62 let mut chars = delim.chars();
63 self.delimiter = chars
64 .next()
65 .ok_or_else(|| SynthError::generation("delimiter must be a non-empty string"))?;
66 }
67 self.buffer.clear();
68 self.record_count = 0;
69 Ok(())
70 }
71
72 fn write_records(&mut self, records: &[GeneratedRecord]) -> Result<usize, SynthError> {
73 for record in records {
74 let mut keys: Vec<&String> = record.fields.keys().collect();
76 keys.sort();
77
78 let values: Vec<String> = keys
79 .iter()
80 .map(|k| {
81 record
82 .fields
83 .get(*k)
84 .map(|v| match v {
85 serde_json::Value::String(s) => s.clone(),
86 other => other.to_string(),
87 })
88 .unwrap_or_default()
89 })
90 .collect();
91
92 let line = values.join(&self.delimiter.to_string());
93 self.buffer.push(line);
94 }
95 self.record_count += records.len();
96 Ok(records.len())
97 }
98
99 fn finalize(&mut self) -> Result<SinkSummary, SynthError> {
100 let total_bytes: usize = self.buffer.iter().map(std::string::String::len).sum();
101 let mut summary = SinkSummary::new(self.record_count);
102 summary.bytes_written = Some(total_bytes as u64);
103 summary
104 .metadata
105 .insert("delimiter".to_string(), self.delimiter.to_string());
106 Ok(summary)
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 fn sample_records() -> Vec<GeneratedRecord> {
115 vec![
116 GeneratedRecord::new("vendor")
117 .with_field("id", serde_json::json!("V001"))
118 .with_field("name", serde_json::json!("Acme Corp")),
119 GeneratedRecord::new("vendor")
120 .with_field("id", serde_json::json!("V002"))
121 .with_field("name", serde_json::json!("Globex Inc")),
122 ]
123 }
124
125 #[test]
126 fn test_csv_echo_default_delimiter() {
127 let mut sink = CsvEchoSink::new();
128 sink.initialize(&serde_json::json!({}))
129 .expect("init should succeed");
130
131 let written = sink
132 .write_records(&sample_records())
133 .expect("write should succeed");
134 assert_eq!(written, 2);
135
136 let lines = sink.lines();
137 assert_eq!(lines.len(), 2);
138 assert_eq!(lines[0], "V001,Acme Corp");
140 assert_eq!(lines[1], "V002,Globex Inc");
141
142 let summary = sink.finalize().expect("finalize should succeed");
143 assert_eq!(summary.records_written, 2);
144 assert!(summary.bytes_written.is_some());
145 }
146
147 #[test]
148 fn test_csv_echo_custom_delimiter() {
149 let mut sink = CsvEchoSink::new();
150 sink.initialize(&serde_json::json!({ "delimiter": ";" }))
151 .expect("init should succeed");
152
153 sink.write_records(&sample_records())
154 .expect("write should succeed");
155
156 let lines = sink.lines();
157 assert_eq!(lines[0], "V001;Acme Corp");
158 }
159
160 #[test]
161 fn test_csv_echo_empty_records() {
162 let mut sink = CsvEchoSink::new();
163 sink.initialize(&serde_json::json!({}))
164 .expect("init should succeed");
165
166 let written = sink.write_records(&[]).expect("write should succeed");
167 assert_eq!(written, 0);
168 assert_eq!(sink.record_count(), 0);
169 }
170
171 #[test]
172 fn test_csv_echo_multiple_batches() {
173 let mut sink = CsvEchoSink::new();
174 sink.initialize(&serde_json::json!({}))
175 .expect("init should succeed");
176
177 sink.write_records(&sample_records()).expect("first batch");
178 sink.write_records(&sample_records()).expect("second batch");
179
180 assert_eq!(sink.record_count(), 4);
181 assert_eq!(sink.lines().len(), 4);
182 }
183}