Skip to main content

datasynth_output/streaming/
csv_sink.rs

1//! CSV streaming sink for real-time data output.
2//!
3//! Writes streaming data to CSV files with optional disk space monitoring.
4//! Optimized to reuse a single serialization buffer across all items instead
5//! of allocating a new csv::Writer per item (Phase 3 I/O optimization).
6
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17/// CSV streaming sink that writes serializable items to a CSV file.
18///
19/// This sink writes each data item as a CSV row, handling headers
20/// automatically on the first write. Uses a reusable internal buffer
21/// to avoid per-item allocations.
22///
23/// # Type Parameters
24///
25/// * `T` - The type of items to write. Must implement `Serialize`.
26///
27/// # Example
28///
29/// ```ignore
30/// use datasynth_output::streaming::CsvStreamingSink;
31/// use datasynth_core::traits::{StreamEvent, StreamingSink};
32///
33/// let mut sink = CsvStreamingSink::<MyData>::new("output.csv".into())?;
34/// sink.process(StreamEvent::Data(my_data))?;
35/// sink.close()?;
36/// ```
37pub struct CsvStreamingSink<T> {
38    writer: BufWriter<File>,
39    items_written: u64,
40    bytes_written: u64,
41    header_written: bool,
42    path: PathBuf,
43    /// Reusable serialization buffer to avoid per-item allocation.
44    serialize_buf: Vec<u8>,
45    _phantom: PhantomData<T>,
46}
47
48impl<T: Serialize + Send> CsvStreamingSink<T> {
49    /// Creates a new CSV streaming sink.
50    ///
51    /// # Arguments
52    ///
53    /// * `path` - Path to the output CSV file
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the file cannot be created.
58    pub fn new(path: PathBuf) -> SynthResult<Self> {
59        let file = File::create(&path)?;
60        Ok(Self {
61            writer: BufWriter::with_capacity(256 * 1024, file),
62            items_written: 0,
63            bytes_written: 0,
64            header_written: false,
65            path,
66            serialize_buf: Vec::with_capacity(4096),
67            _phantom: PhantomData,
68        })
69    }
70
71    /// Creates a CSV streaming sink with a pre-written header.
72    ///
73    /// # Arguments
74    ///
75    /// * `path` - Path to the output CSV file
76    /// * `header` - The header line (without newline)
77    pub fn with_header(path: PathBuf, header: &str) -> SynthResult<Self> {
78        let file = File::create(&path)?;
79        let mut writer = BufWriter::with_capacity(256 * 1024, file);
80        let header_line = format!("{}\n", header);
81        writer.write_all(header_line.as_bytes())?;
82        let bytes_written = header_line.len() as u64;
83
84        Ok(Self {
85            writer,
86            items_written: 0,
87            bytes_written,
88            header_written: true,
89            path,
90            serialize_buf: Vec::with_capacity(4096),
91            _phantom: PhantomData,
92        })
93    }
94
95    /// Returns the path to the output file.
96    pub fn path(&self) -> &PathBuf {
97        &self.path
98    }
99
100    /// Returns the total bytes written.
101    pub fn bytes_written(&self) -> u64 {
102        self.bytes_written
103    }
104
105    /// Writes a single item to CSV, reusing the internal buffer.
106    fn write_item(&mut self, item: &T) -> SynthResult<()> {
107        // Clear and reuse the buffer — no new allocation after the first item
108        self.serialize_buf.clear();
109
110        {
111            let mut wtr = csv::WriterBuilder::new()
112                .has_headers(!self.header_written)
113                .from_writer(&mut self.serialize_buf);
114
115            wtr.serialize(item).map_err(|e| {
116                SynthError::generation(format!("Failed to serialize item to CSV: {}", e))
117            })?;
118
119            // Flush the csv writer into our buffer (not into the file)
120            wtr.flush().map_err(|e| {
121                SynthError::generation(format!("Failed to flush CSV writer: {}", e))
122            })?;
123        }
124
125        self.writer.write_all(&self.serialize_buf)?;
126        self.bytes_written += self.serialize_buf.len() as u64;
127        self.header_written = true;
128        self.items_written += 1;
129
130        Ok(())
131    }
132}
133
134impl<T: Serialize + Send> StreamingSink<T> for CsvStreamingSink<T> {
135    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
136        match event {
137            StreamEvent::Data(item) => {
138                self.write_item(&item)?;
139            }
140            StreamEvent::Complete(_summary) => {
141                self.flush()?;
142            }
143            StreamEvent::BatchComplete { .. } => {
144                // Optionally flush on batch complete
145                self.writer.flush()?;
146            }
147            StreamEvent::Progress(_) | StreamEvent::Error(_) => {
148                // Progress and error events don't need file output
149            }
150        }
151        Ok(())
152    }
153
154    fn flush(&mut self) -> SynthResult<()> {
155        self.writer.flush()?;
156        Ok(())
157    }
158
159    fn close(mut self) -> SynthResult<()> {
160        self.flush()?;
161        Ok(())
162    }
163
164    fn items_processed(&self) -> u64 {
165        self.items_written
166    }
167}
168
169#[cfg(test)]
170#[allow(clippy::unwrap_used)]
171mod tests {
172    use super::*;
173    use serde::{Deserialize, Serialize};
174    use tempfile::tempdir;
175
176    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
177    struct TestRecord {
178        id: u32,
179        name: String,
180        value: f64,
181    }
182
183    #[test]
184    fn test_csv_streaming_sink_basic() {
185        let dir = tempdir().unwrap();
186        let path = dir.path().join("test.csv");
187
188        let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
189
190        let record = TestRecord {
191            id: 1,
192            name: "test".to_string(),
193            value: 42.5,
194        };
195
196        sink.process(StreamEvent::Data(record)).unwrap();
197        sink.close().unwrap();
198
199        // Read back and verify
200        let content = std::fs::read_to_string(&path).unwrap();
201        assert!(content.contains("id"));
202        assert!(content.contains("test"));
203        assert!(content.contains("42.5"));
204    }
205
206    #[test]
207    fn test_csv_streaming_sink_multiple_items() {
208        let dir = tempdir().unwrap();
209        let path = dir.path().join("test.csv");
210
211        let mut sink = CsvStreamingSink::<TestRecord>::new(path.clone()).unwrap();
212
213        for i in 0..10 {
214            let record = TestRecord {
215                id: i,
216                name: format!("item_{}", i),
217                value: i as f64 * 1.5,
218            };
219            sink.process(StreamEvent::Data(record)).unwrap();
220        }
221
222        sink.close().unwrap();
223
224        // Verify all items were written
225        let content = std::fs::read_to_string(&path).unwrap();
226        let lines: Vec<_> = content.lines().collect();
227        // Header + 10 data rows
228        assert_eq!(lines.len(), 11);
229    }
230
231    #[test]
232    fn test_csv_streaming_sink_with_header() {
233        let dir = tempdir().unwrap();
234        let path = dir.path().join("test.csv");
235
236        let mut sink =
237            CsvStreamingSink::<TestRecord>::with_header(path.clone(), "id,name,value").unwrap();
238
239        let record = TestRecord {
240            id: 1,
241            name: "test".to_string(),
242            value: 42.5,
243        };
244
245        sink.process(StreamEvent::Data(record)).unwrap();
246        sink.close().unwrap();
247
248        let content = std::fs::read_to_string(&path).unwrap();
249        let lines: Vec<_> = content.lines().collect();
250        assert_eq!(lines[0], "id,name,value");
251    }
252}