datasynth_output/streaming/
csv_sink.rs1use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17pub struct CsvStreamingSink<T> {
38 writer: BufWriter<File>,
39 items_written: u64,
40 bytes_written: u64,
41 header_written: bool,
42 path: PathBuf,
43 serialize_buf: Vec<u8>,
45 _phantom: PhantomData<T>,
46}
47
48impl<T: Serialize + Send> CsvStreamingSink<T> {
49 pub fn new(path: PathBuf) -> SynthResult<Self> {
59 let file = File::create(&path)?;
60 Ok(Self {
61 writer: BufWriter::with_capacity(256 * 1024, file),
62 items_written: 0,
63 bytes_written: 0,
64 header_written: false,
65 path,
66 serialize_buf: Vec::with_capacity(4096),
67 _phantom: PhantomData,
68 })
69 }
70
71 pub fn with_header(path: PathBuf, header: &str) -> SynthResult<Self> {
78 let file = File::create(&path)?;
79 let mut writer = BufWriter::with_capacity(256 * 1024, file);
80 let header_line = format!("{}\n", header);
81 writer.write_all(header_line.as_bytes())?;
82 let bytes_written = header_line.len() as u64;
83
84 Ok(Self {
85 writer,
86 items_written: 0,
87 bytes_written,
88 header_written: true,
89 path,
90 serialize_buf: Vec::with_capacity(4096),
91 _phantom: PhantomData,
92 })
93 }
94
95 pub fn path(&self) -> &PathBuf {
97 &self.path
98 }
99
100 pub fn bytes_written(&self) -> u64 {
102 self.bytes_written
103 }
104
105 fn write_item(&mut self, item: &T) -> SynthResult<()> {
107 self.serialize_buf.clear();
109
110 {
111 let mut wtr = csv::WriterBuilder::new()
112 .has_headers(!self.header_written)
113 .from_writer(&mut self.serialize_buf);
114
115 wtr.serialize(item).map_err(|e| {
116 SynthError::generation(format!("Failed to serialize item to CSV: {}", e))
117 })?;
118
119 wtr.flush().map_err(|e| {
121 SynthError::generation(format!("Failed to flush CSV writer: {}", e))
122 })?;
123 }
124
125 self.writer.write_all(&self.serialize_buf)?;
126 self.bytes_written += self.serialize_buf.len() as u64;
127 self.header_written = true;
128 self.items_written += 1;
129
130 Ok(())
131 }
132}
133
134impl<T: Serialize + Send> StreamingSink<T> for CsvStreamingSink<T> {
135 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
136 match event {
137 StreamEvent::Data(item) => {
138 self.write_item(&item)?;
139 }
140 StreamEvent::Complete(_summary) => {
141 self.flush()?;
142 }
143 StreamEvent::BatchComplete { .. } => {
144 self.writer.flush()?;
146 }
147 StreamEvent::Progress(_) | StreamEvent::Error(_) => {
148 }
150 }
151 Ok(())
152 }
153
154 fn flush(&mut self) -> SynthResult<()> {
155 self.writer.flush()?;
156 Ok(())
157 }
158
159 fn close(mut self) -> SynthResult<()> {
160 self.flush()?;
161 Ok(())
162 }
163
164 fn items_processed(&self) -> u64 {
165 self.items_written
166 }
167}
168
169#[cfg(test)]
170#[allow(clippy::unwrap_used)]
171mod tests {
172 use super::*;
173 use serde::{Deserialize, Serialize};
174 use tempfile::tempdir;
175
176 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
177 struct TestRecord {
178 id: u32,
179 name: String,
180 value: f64,
181 }
182
183 #[test]
184 fn test_csv_streaming_sink_basic() {
185 let dir = tempdir().unwrap();
186 let path = dir.path().join("test.csv");
187
188 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
189
190 let record = TestRecord {
191 id: 1,
192 name: "test".to_string(),
193 value: 42.5,
194 };
195
196 sink.process(StreamEvent::Data(record)).unwrap();
197 sink.close().unwrap();
198
199 let content = std::fs::read_to_string(&path).unwrap();
201 assert!(content.contains("id"));
202 assert!(content.contains("test"));
203 assert!(content.contains("42.5"));
204 }
205
206 #[test]
207 fn test_csv_streaming_sink_multiple_items() {
208 let dir = tempdir().unwrap();
209 let path = dir.path().join("test.csv");
210
211 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
212
213 for i in 0..10 {
214 let record = TestRecord {
215 id: i,
216 name: format!("item_{}", i),
217 value: i as f64 * 1.5,
218 };
219 sink.process(StreamEvent::Data(record)).unwrap();
220 }
221
222 sink.close().unwrap();
223
224 let content = std::fs::read_to_string(&path).unwrap();
226 let lines: Vec<_> = content.lines().collect();
227 assert_eq!(lines.len(), 11);
229 }
230
231 #[test]
232 fn test_csv_streaming_sink_with_header() {
233 let dir = tempdir().unwrap();
234 let path = dir.path().join("test.csv");
235
236 let mut sink =
237 CsvStreamingSink::<TestRecord>::with_header(path.clone(), "id,name,value").unwrap();
238
239 let record = TestRecord {
240 id: 1,
241 name: "test".to_string(),
242 value: 42.5,
243 };
244
245 sink.process(StreamEvent::Data(record)).unwrap();
246 sink.close().unwrap();
247
248 let content = std::fs::read_to_string(&path).unwrap();
249 let lines: Vec<_> = content.lines().collect();
250 assert_eq!(lines[0], "id,name,value");
251 }
252}