Skip to main content

datasynth_output/streaming/
csv_sink.rs

1//! CSV streaming sink for real-time data output.
2//!
3//! Writes streaming data to CSV files with optional disk space monitoring.
4//! Optimized to reuse a single serialization buffer across all items instead
5//! of allocating a new csv::Writer per item (Phase 3 I/O optimization).
6
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17/// CSV streaming sink that writes serializable items to a CSV file.
18///
19/// This sink writes each data item as a CSV row, handling headers
20/// automatically on the first write. Uses a reusable internal buffer
21/// to avoid per-item allocations.
22///
23/// # Type Parameters
24///
25/// * `T` - The type of items to write. Must implement `Serialize`.
26///
27/// # Example
28///
29/// ```ignore
30/// use datasynth_output::streaming::CsvStreamingSink;
31/// use datasynth_core::traits::{StreamEvent, StreamingSink};
32///
33/// let mut sink = CsvStreamingSink::<MyData>::new("output.csv".into())?;
34/// sink.process(StreamEvent::Data(my_data))?;
35/// sink.close()?;
36/// ```
37pub struct CsvStreamingSink<T> {
38    writer: BufWriter<File>,
39    items_written: u64,
40    bytes_written: u64,
41    header_written: bool,
42    path: PathBuf,
43    /// Reusable serialization buffer to avoid per-item allocation.
44    serialize_buf: Vec<u8>,
45    _phantom: PhantomData<T>,
46}
47
48impl<T: Serialize + Send> CsvStreamingSink<T> {
49    /// Creates a new CSV streaming sink.
50    ///
51    /// # Arguments
52    ///
53    /// * `path` - Path to the output CSV file
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the file cannot be created.
58    pub fn new(path: PathBuf) -> SynthResult<Self> {
59        let file = File::create(&path)?;
60        Ok(Self {
61            writer: BufWriter::with_capacity(256 * 1024, file),
62            items_written: 0,
63            bytes_written: 0,
64            header_written: false,
65            path,
66            serialize_buf: Vec::with_capacity(4096),
67            _phantom: PhantomData,
68        })
69    }
70
71    /// Creates a CSV streaming sink with a pre-written header.
72    ///
73    /// # Arguments
74    ///
75    /// * `path` - Path to the output CSV file
76    /// * `header` - The header line (without newline)
77    pub fn with_header(path: PathBuf, header: &str) -> SynthResult<Self> {
78        let file = File::create(&path)?;
79        let mut writer = BufWriter::with_capacity(256 * 1024, file);
80        let header_line = format!("{header}\n");
81        writer.write_all(header_line.as_bytes())?;
82        let bytes_written = header_line.len() as u64;
83
84        Ok(Self {
85            writer,
86            items_written: 0,
87            bytes_written,
88            header_written: true,
89            path,
90            serialize_buf: Vec::with_capacity(4096),
91            _phantom: PhantomData,
92        })
93    }
94
95    /// Returns the path to the output file.
96    pub fn path(&self) -> &PathBuf {
97        &self.path
98    }
99
100    /// Returns the total bytes written.
101    pub fn bytes_written(&self) -> u64 {
102        self.bytes_written
103    }
104
105    /// Writes a single item to CSV, reusing the internal buffer.
106    fn write_item(&mut self, item: &T) -> SynthResult<()> {
107        // Clear and reuse the buffer — no new allocation after the first item
108        self.serialize_buf.clear();
109
110        {
111            let mut wtr = csv::WriterBuilder::new()
112                .has_headers(!self.header_written)
113                .from_writer(&mut self.serialize_buf);
114
115            wtr.serialize(item).map_err(|e| {
116                SynthError::generation(format!("Failed to serialize item to CSV: {e}"))
117            })?;
118
119            // Flush the csv writer into our buffer (not into the file)
120            wtr.flush()
121                .map_err(|e| SynthError::generation(format!("Failed to flush CSV writer: {e}")))?;
122        }
123
124        self.writer.write_all(&self.serialize_buf)?;
125        self.bytes_written += self.serialize_buf.len() as u64;
126        self.header_written = true;
127        self.items_written += 1;
128
129        Ok(())
130    }
131}
132
133impl<T: Serialize + Send> StreamingSink<T> for CsvStreamingSink<T> {
134    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
135        match event {
136            StreamEvent::Data(item) => {
137                self.write_item(&item)?;
138            }
139            StreamEvent::Complete(_summary) => {
140                self.flush()?;
141            }
142            StreamEvent::BatchComplete { .. } => {
143                // Optionally flush on batch complete
144                self.writer.flush()?;
145            }
146            StreamEvent::Progress(_) | StreamEvent::Error(_) => {
147                // Progress and error events don't need file output
148            }
149        }
150        Ok(())
151    }
152
153    fn flush(&mut self) -> SynthResult<()> {
154        self.writer.flush()?;
155        Ok(())
156    }
157
158    fn close(mut self) -> SynthResult<()> {
159        self.flush()?;
160        Ok(())
161    }
162
163    fn items_processed(&self) -> u64 {
164        self.items_written
165    }
166}
167
168#[cfg(test)]
169#[allow(clippy::unwrap_used)]
170mod tests {
171    use super::*;
172    use serde::{Deserialize, Serialize};
173    use tempfile::tempdir;
174
175    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176    struct TestRecord {
177        id: u32,
178        name: String,
179        value: f64,
180    }
181
182    #[test]
183    fn test_csv_streaming_sink_basic() {
184        let dir = tempdir().unwrap();
185        let path = dir.path().join("test.csv");
186
187        let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
188
189        let record = TestRecord {
190            id: 1,
191            name: "test".to_string(),
192            value: 42.5,
193        };
194
195        sink.process(StreamEvent::Data(record)).unwrap();
196        sink.close().unwrap();
197
198        // Read back and verify
199        let content = std::fs::read_to_string(&path).unwrap();
200        assert!(content.contains("id"));
201        assert!(content.contains("test"));
202        assert!(content.contains("42.5"));
203    }
204
205    #[test]
206    fn test_csv_streaming_sink_multiple_items() {
207        let dir = tempdir().unwrap();
208        let path = dir.path().join("test.csv");
209
210        let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
211
212        for i in 0..10 {
213            let record = TestRecord {
214                id: i,
215                name: format!("item_{}", i),
216                value: i as f64 * 1.5,
217            };
218            sink.process(StreamEvent::Data(record)).unwrap();
219        }
220
221        sink.close().unwrap();
222
223        // Verify all items were written
224        let content = std::fs::read_to_string(&path).unwrap();
225        let lines: Vec<_> = content.lines().collect();
226        // Header + 10 data rows
227        assert_eq!(lines.len(), 11);
228    }
229
230    #[test]
231    fn test_csv_streaming_sink_with_header() {
232        let dir = tempdir().unwrap();
233        let path = dir.path().join("test.csv");
234
235        let mut sink =
236            CsvStreamingSink::<TestRecord>::with_header(path.clone(), "id,name,value").unwrap();
237
238        let record = TestRecord {
239            id: 1,
240            name: "test".to_string(),
241            value: 42.5,
242        };
243
244        sink.process(StreamEvent::Data(record)).unwrap();
245        sink.close().unwrap();
246
247        let content = std::fs::read_to_string(&path).unwrap();
248        let lines: Vec<_> = content.lines().collect();
249        assert_eq!(lines[0], "id,name,value");
250    }
251}