Skip to main content

datasynth_output/streaming/
json_sink.rs

1//! JSON streaming sinks for real-time data output.
2//!
3//! Provides both JSON array output and Newline-Delimited JSON (NDJSON) output.
4//! Optimized to use serde_json::to_writer() for zero-copy serialization directly
5//! into the buffered writer (Phase 3 I/O optimization).
6
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17/// JSON streaming sink that writes items as a JSON array.
18///
19/// The output format is a valid JSON array:
20/// ```json
21/// [
22///   { "field": "value1" },
23///   { "field": "value2" }
24/// ]
25/// ```
26///
27/// # Type Parameters
28///
29/// * `T` - The type of items to write. Must implement `Serialize`.
30pub struct JsonStreamingSink<T> {
31    writer: BufWriter<File>,
32    items_written: u64,
33    bytes_written: u64,
34    is_first: bool,
35    path: PathBuf,
36    pretty_print: bool,
37    _phantom: PhantomData<T>,
38}
39
40impl<T: Serialize + Send> JsonStreamingSink<T> {
41    /// Creates a new JSON streaming sink.
42    ///
43    /// # Arguments
44    ///
45    /// * `path` - Path to the output JSON file
46    ///
47    /// # Errors
48    ///
49    /// Returns an error if the file cannot be created.
50    pub fn new(path: PathBuf) -> SynthResult<Self> {
51        Self::with_options(path, false)
52    }
53
54    /// Creates a JSON streaming sink with pretty printing enabled.
55    pub fn pretty(path: PathBuf) -> SynthResult<Self> {
56        Self::with_options(path, true)
57    }
58
59    /// Creates a JSON streaming sink with configurable options.
60    fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
61        let file = File::create(&path)?;
62        let mut writer = BufWriter::with_capacity(256 * 1024, file);
63
64        // Write opening bracket
65        let opening = if pretty_print { "[\n" } else { "[" };
66        writer.write_all(opening.as_bytes())?;
67
68        Ok(Self {
69            writer,
70            items_written: 0,
71            bytes_written: opening.len() as u64,
72            is_first: true,
73            path,
74            pretty_print,
75            _phantom: PhantomData,
76        })
77    }
78
79    /// Returns the path to the output file.
80    pub fn path(&self) -> &PathBuf {
81        &self.path
82    }
83
84    /// Returns the total bytes written.
85    pub fn bytes_written(&self) -> u64 {
86        self.bytes_written
87    }
88
89    /// Writes a single item to JSON.
90    ///
91    /// For compact mode, serializes directly to the BufWriter (zero-copy).
92    /// For pretty mode, uses a reusable buffer to avoid per-line allocations.
93    fn write_item(&mut self, item: &T) -> SynthResult<()> {
94        // Write separator
95        if !self.is_first {
96            let sep = if self.pretty_print { ",\n" } else { "," };
97            self.writer.write_all(sep.as_bytes())?;
98            self.bytes_written += sep.len() as u64;
99        }
100        self.is_first = false;
101
102        if self.pretty_print {
103            // Pretty print: serialize to a temporary buffer, then indent
104            let json = serde_json::to_string_pretty(item).map_err(|e| {
105                SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
106            })?;
107            // Write each line with 2-space indent directly to the writer
108            for (i, line) in json.lines().enumerate() {
109                if i > 0 {
110                    self.writer.write_all(b"\n")?;
111                }
112                self.writer.write_all(b"  ")?;
113                self.writer.write_all(line.as_bytes())?;
114            }
115            self.bytes_written += json.len() as u64;
116        } else {
117            // Compact mode: serialize directly to BufWriter — zero intermediate allocation
118            serde_json::to_writer(&mut self.writer, item).map_err(|e| {
119                SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
120            })?;
121            self.bytes_written += 100; // estimate
122        }
123
124        self.items_written += 1;
125        Ok(())
126    }
127
128    /// Finalize the JSON array by writing the closing bracket.
129    fn finalize(&mut self) -> SynthResult<()> {
130        let closing = if self.pretty_print { "\n]" } else { "]" };
131        self.writer.write_all(closing.as_bytes())?;
132        self.bytes_written += closing.len() as u64;
133        self.writer.flush()?;
134        Ok(())
135    }
136}
137
138impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
139    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
140        match event {
141            StreamEvent::Data(item) => {
142                self.write_item(&item)?;
143            }
144            StreamEvent::Complete(_summary) => {
145                self.finalize()?;
146            }
147            StreamEvent::BatchComplete { .. } => {
148                self.writer.flush()?;
149            }
150            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
151        }
152        Ok(())
153    }
154
155    fn flush(&mut self) -> SynthResult<()> {
156        self.writer.flush()?;
157        Ok(())
158    }
159
160    fn close(mut self) -> SynthResult<()> {
161        self.finalize()?;
162        Ok(())
163    }
164
165    fn items_processed(&self) -> u64 {
166        self.items_written
167    }
168}
169
170/// Newline-Delimited JSON (NDJSON) streaming sink.
171///
172/// Each item is written as a separate JSON object on its own line:
173/// ```json
174/// {"field": "value1"}
175/// {"field": "value2"}
176/// ```
177///
178/// This format is ideal for streaming and processing line by line.
179///
180/// # Type Parameters
181///
182/// * `T` - The type of items to write. Must implement `Serialize`.
183pub struct NdjsonStreamingSink<T> {
184    writer: BufWriter<File>,
185    items_written: u64,
186    bytes_written: u64,
187    path: PathBuf,
188    _phantom: PhantomData<T>,
189}
190
191impl<T: Serialize + Send> NdjsonStreamingSink<T> {
192    /// Creates a new NDJSON streaming sink.
193    ///
194    /// # Arguments
195    ///
196    /// * `path` - Path to the output NDJSON file
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the file cannot be created.
201    pub fn new(path: PathBuf) -> SynthResult<Self> {
202        let file = File::create(&path)?;
203        Ok(Self {
204            writer: BufWriter::with_capacity(256 * 1024, file),
205            items_written: 0,
206            bytes_written: 0,
207            path,
208            _phantom: PhantomData,
209        })
210    }
211
212    /// Returns the path to the output file.
213    pub fn path(&self) -> &PathBuf {
214        &self.path
215    }
216
217    /// Returns the total bytes written.
218    pub fn bytes_written(&self) -> u64 {
219        self.bytes_written
220    }
221
222    /// Writes a single item as a JSON line.
223    ///
224    /// Serializes directly to the BufWriter, avoiding intermediate String allocation.
225    fn write_item(&mut self, item: &T) -> SynthResult<()> {
226        serde_json::to_writer(&mut self.writer, item).map_err(|e| {
227            SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
228        })?;
229
230        self.writer.write_all(b"\n")?;
231        self.bytes_written += 100; // estimate
232        self.items_written += 1;
233
234        Ok(())
235    }
236}
237
238impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
239    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
240        match event {
241            StreamEvent::Data(item) => {
242                self.write_item(&item)?;
243            }
244            StreamEvent::Complete(_summary) => {
245                self.flush()?;
246            }
247            StreamEvent::BatchComplete { .. } => {
248                self.writer.flush()?;
249            }
250            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
251        }
252        Ok(())
253    }
254
255    fn flush(&mut self) -> SynthResult<()> {
256        self.writer.flush()?;
257        Ok(())
258    }
259
260    fn close(mut self) -> SynthResult<()> {
261        self.flush()?;
262        Ok(())
263    }
264
265    fn items_processed(&self) -> u64 {
266        self.items_written
267    }
268}
269
270#[cfg(test)]
271#[allow(clippy::unwrap_used)]
272mod tests {
273    use super::*;
274    use datasynth_core::traits::StreamSummary;
275    use serde::{Deserialize, Serialize};
276    use tempfile::tempdir;
277
278    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
279    struct TestRecord {
280        id: u32,
281        name: String,
282        value: f64,
283    }
284
285    #[test]
286    fn test_json_streaming_sink_basic() {
287        let dir = tempdir().unwrap();
288        let path = dir.path().join("test.json");
289
290        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
291
292        let record = TestRecord {
293            id: 1,
294            name: "test".to_string(),
295            value: 42.5,
296        };
297
298        sink.process(StreamEvent::Data(record)).unwrap();
299        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
300            .unwrap();
301
302        // Read back and verify it's valid JSON
303        let content = std::fs::read_to_string(&path).unwrap();
304        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
305        assert_eq!(parsed.len(), 1);
306        assert_eq!(parsed[0].id, 1);
307    }
308
309    #[test]
310    fn test_json_streaming_sink_multiple_items() {
311        let dir = tempdir().unwrap();
312        let path = dir.path().join("test.json");
313
314        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
315
316        for i in 0..5 {
317            let record = TestRecord {
318                id: i,
319                name: format!("item_{}", i),
320                value: i as f64,
321            };
322            sink.process(StreamEvent::Data(record)).unwrap();
323        }
324        sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
325            .unwrap();
326
327        let content = std::fs::read_to_string(&path).unwrap();
328        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
329        assert_eq!(parsed.len(), 5);
330    }
331
332    #[test]
333    fn test_json_streaming_sink_pretty() {
334        let dir = tempdir().unwrap();
335        let path = dir.path().join("test.json");
336
337        let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
338
339        let record = TestRecord {
340            id: 1,
341            name: "test".to_string(),
342            value: 42.5,
343        };
344
345        sink.process(StreamEvent::Data(record)).unwrap();
346        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
347            .unwrap();
348
349        let content = std::fs::read_to_string(&path).unwrap();
350        // Pretty printed should have newlines and indentation
351        assert!(content.contains("\n"));
352        assert!(content.contains("  "));
353    }
354
355    #[test]
356    fn test_ndjson_streaming_sink_basic() {
357        let dir = tempdir().unwrap();
358        let path = dir.path().join("test.ndjson");
359
360        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
361
362        for i in 0..3 {
363            let record = TestRecord {
364                id: i,
365                name: format!("item_{}", i),
366                value: i as f64,
367            };
368            sink.process(StreamEvent::Data(record)).unwrap();
369        }
370        sink.close().unwrap();
371
372        // Read back and verify line by line
373        let content = std::fs::read_to_string(&path).unwrap();
374        let lines: Vec<_> = content.lines().collect();
375        assert_eq!(lines.len(), 3);
376
377        // Each line should be valid JSON
378        for (i, line) in lines.iter().enumerate() {
379            let record: TestRecord = serde_json::from_str(line).unwrap();
380            assert_eq!(record.id, i as u32);
381        }
382    }
383
384    #[test]
385    fn test_ndjson_items_processed() {
386        let dir = tempdir().unwrap();
387        let path = dir.path().join("test.ndjson");
388
389        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
390
391        for i in 0..10 {
392            let record = TestRecord {
393                id: i,
394                name: format!("item_{}", i),
395                value: i as f64,
396            };
397            sink.process(StreamEvent::Data(record)).unwrap();
398        }
399
400        assert_eq!(sink.items_processed(), 10);
401    }
402}