datasynth_output/streaming/
csv_sink.rs1use std::fs::File;
6use std::io::{BufWriter, Write};
7use std::marker::PhantomData;
8use std::path::PathBuf;
9
10use serde::Serialize;
11
12use datasynth_core::error::{SynthError, SynthResult};
13use datasynth_core::traits::{StreamEvent, StreamingSink};
14
15pub struct CsvStreamingSink<T> {
35 writer: BufWriter<File>,
36 items_written: u64,
37 bytes_written: u64,
38 header_written: bool,
39 path: PathBuf,
40 _phantom: PhantomData<T>,
41}
42
43impl<T: Serialize + Send> CsvStreamingSink<T> {
44 pub fn new(path: PathBuf) -> SynthResult<Self> {
54 let file = File::create(&path)?;
55 Ok(Self {
56 writer: BufWriter::new(file),
57 items_written: 0,
58 bytes_written: 0,
59 header_written: false,
60 path,
61 _phantom: PhantomData,
62 })
63 }
64
65 pub fn with_header(path: PathBuf, header: &str) -> SynthResult<Self> {
72 let file = File::create(&path)?;
73 let mut writer = BufWriter::new(file);
74 let header_line = format!("{}\n", header);
75 writer.write_all(header_line.as_bytes())?;
76 let bytes_written = header_line.len() as u64;
77
78 Ok(Self {
79 writer,
80 items_written: 0,
81 bytes_written,
82 header_written: true,
83 path,
84 _phantom: PhantomData,
85 })
86 }
87
88 pub fn path(&self) -> &PathBuf {
90 &self.path
91 }
92
93 pub fn bytes_written(&self) -> u64 {
95 self.bytes_written
96 }
97
98 fn write_item(&mut self, item: &T) -> SynthResult<()> {
100 let mut wtr = csv::WriterBuilder::new()
102 .has_headers(!self.header_written)
103 .from_writer(Vec::new());
104
105 wtr.serialize(item).map_err(|e| {
106 SynthError::generation(format!("Failed to serialize item to CSV: {}", e))
107 })?;
108
109 let data = wtr
110 .into_inner()
111 .map_err(|e| SynthError::generation(format!("Failed to flush CSV writer: {}", e)))?;
112
113 self.writer.write_all(&data)?;
114 self.bytes_written += data.len() as u64;
115 self.header_written = true;
116 self.items_written += 1;
117
118 Ok(())
119 }
120}
121
122impl<T: Serialize + Send> StreamingSink<T> for CsvStreamingSink<T> {
123 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
124 match event {
125 StreamEvent::Data(item) => {
126 self.write_item(&item)?;
127 }
128 StreamEvent::Complete(_summary) => {
129 self.flush()?;
130 }
131 StreamEvent::BatchComplete { .. } => {
132 self.writer.flush()?;
134 }
135 StreamEvent::Progress(_) | StreamEvent::Error(_) => {
136 }
138 }
139 Ok(())
140 }
141
142 fn flush(&mut self) -> SynthResult<()> {
143 self.writer.flush()?;
144 Ok(())
145 }
146
147 fn close(mut self) -> SynthResult<()> {
148 self.flush()?;
149 Ok(())
150 }
151
152 fn items_processed(&self) -> u64 {
153 self.items_written
154 }
155}
156
157#[cfg(test)]
158#[allow(clippy::unwrap_used)]
159mod tests {
160 use super::*;
161 use serde::{Deserialize, Serialize};
162 use tempfile::tempdir;
163
164 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
165 struct TestRecord {
166 id: u32,
167 name: String,
168 value: f64,
169 }
170
171 #[test]
172 fn test_csv_streaming_sink_basic() {
173 let dir = tempdir().unwrap();
174 let path = dir.path().join("test.csv");
175
176 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
177
178 let record = TestRecord {
179 id: 1,
180 name: "test".to_string(),
181 value: 42.5,
182 };
183
184 sink.process(StreamEvent::Data(record)).unwrap();
185 sink.close().unwrap();
186
187 let content = std::fs::read_to_string(&path).unwrap();
189 assert!(content.contains("id"));
190 assert!(content.contains("test"));
191 assert!(content.contains("42.5"));
192 }
193
194 #[test]
195 fn test_csv_streaming_sink_multiple_items() {
196 let dir = tempdir().unwrap();
197 let path = dir.path().join("test.csv");
198
199 let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
200
201 for i in 0..10 {
202 let record = TestRecord {
203 id: i,
204 name: format!("item_{}", i),
205 value: i as f64 * 1.5,
206 };
207 sink.process(StreamEvent::Data(record)).unwrap();
208 }
209
210 sink.close().unwrap();
211
212 let content = std::fs::read_to_string(&path).unwrap();
214 let lines: Vec<_> = content.lines().collect();
215 assert_eq!(lines.len(), 11);
217 }
218
219 #[test]
220 fn test_csv_streaming_sink_with_header() {
221 let dir = tempdir().unwrap();
222 let path = dir.path().join("test.csv");
223
224 let mut sink =
225 CsvStreamingSink::<TestRecord>::with_header(path.clone(), "id,name,value").unwrap();
226
227 let record = TestRecord {
228 id: 1,
229 name: "test".to_string(),
230 value: 42.5,
231 };
232
233 sink.process(StreamEvent::Data(record)).unwrap();
234 sink.close().unwrap();
235
236 let content = std::fs::read_to_string(&path).unwrap();
237 let lines: Vec<_> = content.lines().collect();
238 assert_eq!(lines[0], "id,name,value");
239 }
240}