datasynth_output/streaming/
csv_sink.rs1use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17pub struct CsvStreamingSink<T> {
38 writer: BufWriter<File>,
39 items_written: u64,
40 bytes_written: u64,
41 header_written: bool,
42 path: PathBuf,
43 serialize_buf: Vec<u8>,
45 _phantom: PhantomData<T>,
46}
47
48impl<T: Serialize + Send> CsvStreamingSink<T> {
49 pub fn new(path: PathBuf) -> SynthResult<Self> {
59 let file = File::create(&path)?;
60 Ok(Self {
61 writer: BufWriter::with_capacity(256 * 1024, file),
62 items_written: 0,
63 bytes_written: 0,
64 header_written: false,
65 path,
66 serialize_buf: Vec::with_capacity(4096),
67 _phantom: PhantomData,
68 })
69 }
70
71 pub fn with_header(path: PathBuf, header: &str) -> SynthResult<Self> {
78 let file = File::create(&path)?;
79 let mut writer = BufWriter::with_capacity(256 * 1024, file);
80 let header_line = format!("{header}\n");
81 writer.write_all(header_line.as_bytes())?;
82 let bytes_written = header_line.len() as u64;
83
84 Ok(Self {
85 writer,
86 items_written: 0,
87 bytes_written,
88 header_written: true,
89 path,
90 serialize_buf: Vec::with_capacity(4096),
91 _phantom: PhantomData,
92 })
93 }
94
95 pub fn path(&self) -> &PathBuf {
97 &self.path
98 }
99
100 pub fn bytes_written(&self) -> u64 {
102 self.bytes_written
103 }
104
105 fn write_item(&mut self, item: &T) -> SynthResult<()> {
107 self.serialize_buf.clear();
109
110 {
111 let mut wtr = csv::WriterBuilder::new()
112 .has_headers(!self.header_written)
113 .from_writer(&mut self.serialize_buf);
114
115 wtr.serialize(item).map_err(|e| {
116 SynthError::generation(format!("Failed to serialize item to CSV: {e}"))
117 })?;
118
119 wtr.flush()
121 .map_err(|e| SynthError::generation(format!("Failed to flush CSV writer: {e}")))?;
122 }
123
124 self.writer.write_all(&self.serialize_buf)?;
125 self.bytes_written += self.serialize_buf.len() as u64;
126 self.header_written = true;
127 self.items_written += 1;
128
129 Ok(())
130 }
131}
132
133impl<T: Serialize + Send> StreamingSink<T> for CsvStreamingSink<T> {
134 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
135 match event {
136 StreamEvent::Data(item) => {
137 self.write_item(&item)?;
138 }
139 StreamEvent::Complete(_summary) => {
140 self.flush()?;
141 }
142 StreamEvent::BatchComplete { .. } => {
143 self.writer.flush()?;
145 }
146 StreamEvent::Progress(_) | StreamEvent::Error(_) => {
147 }
149 }
150 Ok(())
151 }
152
153 fn flush(&mut self) -> SynthResult<()> {
154 self.writer.flush()?;
155 Ok(())
156 }
157
158 fn close(mut self) -> SynthResult<()> {
159 self.flush()?;
160 Ok(())
161 }
162
163 fn items_processed(&self) -> u64 {
164 self.items_written
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use serde::{Deserialize, Serialize};
172 use tempfile::tempdir;
173
174 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
175 struct TestRecord {
176 id: u32,
177 name: String,
178 value: f64,
179 }
180
181 #[test]
182 fn test_csv_streaming_sink_basic() {
183 let dir = tempdir().unwrap();
184 let path = dir.path().join("test.csv");
185
186 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
187
188 let record = TestRecord {
189 id: 1,
190 name: "test".to_string(),
191 value: 42.5,
192 };
193
194 sink.process(StreamEvent::Data(record)).unwrap();
195 sink.close().unwrap();
196
197 let content = std::fs::read_to_string(&path).unwrap();
199 assert!(content.contains("id"));
200 assert!(content.contains("test"));
201 assert!(content.contains("42.5"));
202 }
203
204 #[test]
205 fn test_csv_streaming_sink_multiple_items() {
206 let dir = tempdir().unwrap();
207 let path = dir.path().join("test.csv");
208
209 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
210
211 for i in 0..10 {
212 let record = TestRecord {
213 id: i,
214 name: format!("item_{}", i),
215 value: i as f64 * 1.5,
216 };
217 sink.process(StreamEvent::Data(record)).unwrap();
218 }
219
220 sink.close().unwrap();
221
222 let content = std::fs::read_to_string(&path).unwrap();
224 let lines: Vec<_> = content.lines().collect();
225 assert_eq!(lines.len(), 11);
227 }
228
229 #[test]
230 fn test_csv_streaming_sink_with_header() {
231 let dir = tempdir().unwrap();
232 let path = dir.path().join("test.csv");
233
234 let mut sink =
235 CsvStreamingSink::<TestRecord>::with_header(path.clone(), "id,name,value").unwrap();
236
237 let record = TestRecord {
238 id: 1,
239 name: "test".to_string(),
240 value: 42.5,
241 };
242
243 sink.process(StreamEvent::Data(record)).unwrap();
244 sink.close().unwrap();
245
246 let content = std::fs::read_to_string(&path).unwrap();
247 let lines: Vec<_> = content.lines().collect();
248 assert_eq!(lines[0], "id,name,value");
249 }
250}