datasynth_output/streaming/
csv_sink.rs1use std::fs::File;
6use std::io::{BufWriter, Write};
7use std::marker::PhantomData;
8use std::path::PathBuf;
9
10use serde::Serialize;
11
12use datasynth_core::error::{SynthError, SynthResult};
13use datasynth_core::traits::{StreamEvent, StreamingSink};
14
15pub struct CsvStreamingSink<T> {
35 writer: BufWriter<File>,
36 items_written: u64,
37 bytes_written: u64,
38 header_written: bool,
39 path: PathBuf,
40 _phantom: PhantomData<T>,
41}
42
43impl<T: Serialize + Send> CsvStreamingSink<T> {
44 pub fn new(path: PathBuf) -> SynthResult<Self> {
54 let file = File::create(&path)?;
55 Ok(Self {
56 writer: BufWriter::new(file),
57 items_written: 0,
58 bytes_written: 0,
59 header_written: false,
60 path,
61 _phantom: PhantomData,
62 })
63 }
64
65 pub fn with_header(path: PathBuf, header: &str) -> SynthResult<Self> {
72 let file = File::create(&path)?;
73 let mut writer = BufWriter::new(file);
74 let header_line = format!("{}\n", header);
75 writer.write_all(header_line.as_bytes())?;
76 let bytes_written = header_line.len() as u64;
77
78 Ok(Self {
79 writer,
80 items_written: 0,
81 bytes_written,
82 header_written: true,
83 path,
84 _phantom: PhantomData,
85 })
86 }
87
88 pub fn path(&self) -> &PathBuf {
90 &self.path
91 }
92
93 pub fn bytes_written(&self) -> u64 {
95 self.bytes_written
96 }
97
98 fn write_item(&mut self, item: &T) -> SynthResult<()> {
100 let mut wtr = csv::WriterBuilder::new()
102 .has_headers(!self.header_written)
103 .from_writer(Vec::new());
104
105 wtr.serialize(item).map_err(|e| {
106 SynthError::generation(format!("Failed to serialize item to CSV: {}", e))
107 })?;
108
109 let data = wtr
110 .into_inner()
111 .map_err(|e| SynthError::generation(format!("Failed to flush CSV writer: {}", e)))?;
112
113 self.writer.write_all(&data)?;
114 self.bytes_written += data.len() as u64;
115 self.header_written = true;
116 self.items_written += 1;
117
118 Ok(())
119 }
120}
121
122impl<T: Serialize + Send> StreamingSink<T> for CsvStreamingSink<T> {
123 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
124 match event {
125 StreamEvent::Data(item) => {
126 self.write_item(&item)?;
127 }
128 StreamEvent::Complete(_summary) => {
129 self.flush()?;
130 }
131 StreamEvent::BatchComplete { .. } => {
132 self.writer.flush()?;
134 }
135 StreamEvent::Progress(_) | StreamEvent::Error(_) => {
136 }
138 }
139 Ok(())
140 }
141
142 fn flush(&mut self) -> SynthResult<()> {
143 self.writer.flush()?;
144 Ok(())
145 }
146
147 fn close(mut self) -> SynthResult<()> {
148 self.flush()?;
149 Ok(())
150 }
151
152 fn items_processed(&self) -> u64 {
153 self.items_written
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use serde::{Deserialize, Serialize};
161 use tempfile::tempdir;
162
163 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
164 struct TestRecord {
165 id: u32,
166 name: String,
167 value: f64,
168 }
169
170 #[test]
171 fn test_csv_streaming_sink_basic() {
172 let dir = tempdir().unwrap();
173 let path = dir.path().join("test.csv");
174
175 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
176
177 let record = TestRecord {
178 id: 1,
179 name: "test".to_string(),
180 value: 42.5,
181 };
182
183 sink.process(StreamEvent::Data(record)).unwrap();
184 sink.close().unwrap();
185
186 let content = std::fs::read_to_string(&path).unwrap();
188 assert!(content.contains("id"));
189 assert!(content.contains("test"));
190 assert!(content.contains("42.5"));
191 }
192
193 #[test]
194 fn test_csv_streaming_sink_multiple_items() {
195 let dir = tempdir().unwrap();
196 let path = dir.path().join("test.csv");
197
198 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
199
200 for i in 0..10 {
201 let record = TestRecord {
202 id: i,
203 name: format!("item_{}", i),
204 value: i as f64 * 1.5,
205 };
206 sink.process(StreamEvent::Data(record)).unwrap();
207 }
208
209 sink.close().unwrap();
210
211 let content = std::fs::read_to_string(&path).unwrap();
213 let lines: Vec<_> = content.lines().collect();
214 assert_eq!(lines.len(), 11);
216 }
217
218 #[test]
219 fn test_csv_streaming_sink_with_header() {
220 let dir = tempdir().unwrap();
221 let path = dir.path().join("test.csv");
222
223 let mut sink =
224 CsvStreamingSink::<TestRecord>::with_header(path.clone(), "id,name,value").unwrap();
225
226 let record = TestRecord {
227 id: 1,
228 name: "test".to_string(),
229 value: 42.5,
230 };
231
232 sink.process(StreamEvent::Data(record)).unwrap();
233 sink.close().unwrap();
234
235 let content = std::fs::read_to_string(&path).unwrap();
236 let lines: Vec<_> = content.lines().collect();
237 assert_eq!(lines[0], "id,name,value");
238 }
239}