Skip to main content

datasynth_output/streaming/
json_sink.rs

1//! JSON streaming sinks for real-time data output.
2//!
3//! Provides both JSON array output and Newline-Delimited JSON (NDJSON) output.
4//! Optimized to use serde_json::to_writer() for zero-copy serialization directly
5//! into the buffered writer (Phase 3 I/O optimization).
6
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17/// JSON streaming sink that writes items as a JSON array.
18///
19/// The output format is a valid JSON array:
20/// ```json
21/// [
22///   { "field": "value1" },
23///   { "field": "value2" }
24/// ]
25/// ```
26///
27/// # Type Parameters
28///
29/// * `T` - The type of items to write. Must implement `Serialize`.
30pub struct JsonStreamingSink<T> {
31    writer: BufWriter<File>,
32    items_written: u64,
33    bytes_written: u64,
34    is_first: bool,
35    path: PathBuf,
36    pretty_print: bool,
37    _phantom: PhantomData<T>,
38}
39
40impl<T: Serialize + Send> JsonStreamingSink<T> {
41    /// Creates a new JSON streaming sink.
42    ///
43    /// # Arguments
44    ///
45    /// * `path` - Path to the output JSON file
46    ///
47    /// # Errors
48    ///
49    /// Returns an error if the file cannot be created.
50    pub fn new(path: PathBuf) -> SynthResult<Self> {
51        Self::with_options(path, false)
52    }
53
54    /// Creates a JSON streaming sink with pretty printing enabled.
55    pub fn pretty(path: PathBuf) -> SynthResult<Self> {
56        Self::with_options(path, true)
57    }
58
59    /// Creates a JSON streaming sink with configurable options.
60    fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
61        let file = File::create(&path)?;
62        let mut writer = BufWriter::with_capacity(256 * 1024, file);
63
64        // Write opening bracket
65        let opening = if pretty_print { "[\n" } else { "[" };
66        writer.write_all(opening.as_bytes())?;
67
68        Ok(Self {
69            writer,
70            items_written: 0,
71            bytes_written: opening.len() as u64,
72            is_first: true,
73            path,
74            pretty_print,
75            _phantom: PhantomData,
76        })
77    }
78
79    /// Returns the path to the output file.
80    pub fn path(&self) -> &PathBuf {
81        &self.path
82    }
83
84    /// Returns the total bytes written.
85    pub fn bytes_written(&self) -> u64 {
86        self.bytes_written
87    }
88
89    /// Writes a single item to JSON.
90    ///
91    /// For compact mode, serializes directly to the BufWriter (zero-copy).
92    /// For pretty mode, uses a reusable buffer to avoid per-line allocations.
93    fn write_item(&mut self, item: &T) -> SynthResult<()> {
94        // Write separator
95        if !self.is_first {
96            let sep = if self.pretty_print { ",\n" } else { "," };
97            self.writer.write_all(sep.as_bytes())?;
98            self.bytes_written += sep.len() as u64;
99        }
100        self.is_first = false;
101
102        if self.pretty_print {
103            // Pretty print: serialize to a temporary buffer, then indent
104            let json = serde_json::to_string_pretty(item).map_err(|e| {
105                SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
106            })?;
107            // Write each line with 2-space indent directly to the writer
108            for (i, line) in json.lines().enumerate() {
109                if i > 0 {
110                    self.writer.write_all(b"\n")?;
111                }
112                self.writer.write_all(b"  ")?;
113                self.writer.write_all(line.as_bytes())?;
114            }
115            self.bytes_written += json.len() as u64;
116        } else {
117            // Compact mode: serialize directly to BufWriter — zero intermediate allocation
118            serde_json::to_writer(&mut self.writer, item).map_err(|e| {
119                SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
120            })?;
121            self.bytes_written += 100; // estimate
122        }
123
124        self.items_written += 1;
125        Ok(())
126    }
127
128    /// Finalize the JSON array by writing the closing bracket.
129    fn finalize(&mut self) -> SynthResult<()> {
130        let closing = if self.pretty_print { "\n]" } else { "]" };
131        self.writer.write_all(closing.as_bytes())?;
132        self.bytes_written += closing.len() as u64;
133        self.writer.flush()?;
134        Ok(())
135    }
136}
137
138impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
139    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
140        match event {
141            StreamEvent::Data(item) => {
142                self.write_item(&item)?;
143            }
144            StreamEvent::Complete(_summary) => {
145                self.finalize()?;
146            }
147            StreamEvent::BatchComplete { .. } => {
148                self.writer.flush()?;
149            }
150            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
151        }
152        Ok(())
153    }
154
155    fn flush(&mut self) -> SynthResult<()> {
156        self.writer.flush()?;
157        Ok(())
158    }
159
160    fn close(mut self) -> SynthResult<()> {
161        self.finalize()?;
162        Ok(())
163    }
164
165    fn items_processed(&self) -> u64 {
166        self.items_written
167    }
168}
169
170/// Newline-Delimited JSON (NDJSON) streaming sink.
171///
172/// Each item is written as a separate JSON object on its own line:
173/// ```json
174/// {"field": "value1"}
175/// {"field": "value2"}
176/// ```
177///
178/// This format is ideal for streaming and processing line by line.
179///
180/// # Type Parameters
181///
182/// * `T` - The type of items to write. Must implement `Serialize`.
183pub struct NdjsonStreamingSink<T> {
184    writer: BufWriter<File>,
185    items_written: u64,
186    bytes_written: u64,
187    path: PathBuf,
188    _phantom: PhantomData<T>,
189}
190
191impl<T: Serialize + Send> NdjsonStreamingSink<T> {
192    /// Creates a new NDJSON streaming sink.
193    ///
194    /// # Arguments
195    ///
196    /// * `path` - Path to the output NDJSON file
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the file cannot be created.
201    pub fn new(path: PathBuf) -> SynthResult<Self> {
202        let file = File::create(&path)?;
203        Ok(Self {
204            writer: BufWriter::with_capacity(256 * 1024, file),
205            items_written: 0,
206            bytes_written: 0,
207            path,
208            _phantom: PhantomData,
209        })
210    }
211
212    /// Returns the path to the output file.
213    pub fn path(&self) -> &PathBuf {
214        &self.path
215    }
216
217    /// Returns the total bytes written.
218    pub fn bytes_written(&self) -> u64 {
219        self.bytes_written
220    }
221
222    /// Writes a single item as a JSON line.
223    ///
224    /// Serializes directly to the BufWriter, avoiding intermediate String allocation.
225    fn write_item(&mut self, item: &T) -> SynthResult<()> {
226        serde_json::to_writer(&mut self.writer, item).map_err(|e| {
227            SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
228        })?;
229
230        self.writer.write_all(b"\n")?;
231        self.bytes_written += 100; // estimate
232        self.items_written += 1;
233
234        Ok(())
235    }
236}
237
238impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
239    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
240        match event {
241            StreamEvent::Data(item) => {
242                self.write_item(&item)?;
243            }
244            StreamEvent::Complete(_summary) => {
245                self.flush()?;
246            }
247            StreamEvent::BatchComplete { .. } => {
248                self.writer.flush()?;
249            }
250            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
251        }
252        Ok(())
253    }
254
255    fn flush(&mut self) -> SynthResult<()> {
256        self.writer.flush()?;
257        Ok(())
258    }
259
260    fn close(mut self) -> SynthResult<()> {
261        self.flush()?;
262        Ok(())
263    }
264
265    fn items_processed(&self) -> u64 {
266        self.items_written
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use datasynth_core::traits::StreamSummary;
274    use serde::{Deserialize, Serialize};
275    use tempfile::tempdir;
276
277    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
278    struct TestRecord {
279        id: u32,
280        name: String,
281        value: f64,
282    }
283
284    #[test]
285    fn test_json_streaming_sink_basic() {
286        let dir = tempdir().unwrap();
287        let path = dir.path().join("test.json");
288
289        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
290
291        let record = TestRecord {
292            id: 1,
293            name: "test".to_string(),
294            value: 42.5,
295        };
296
297        sink.process(StreamEvent::Data(record)).unwrap();
298        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
299            .unwrap();
300
301        // Read back and verify it's valid JSON
302        let content = std::fs::read_to_string(&path).unwrap();
303        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
304        assert_eq!(parsed.len(), 1);
305        assert_eq!(parsed[0].id, 1);
306    }
307
308    #[test]
309    fn test_json_streaming_sink_multiple_items() {
310        let dir = tempdir().unwrap();
311        let path = dir.path().join("test.json");
312
313        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
314
315        for i in 0..5 {
316            let record = TestRecord {
317                id: i,
318                name: format!("item_{}", i),
319                value: i as f64,
320            };
321            sink.process(StreamEvent::Data(record)).unwrap();
322        }
323        sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
324            .unwrap();
325
326        let content = std::fs::read_to_string(&path).unwrap();
327        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
328        assert_eq!(parsed.len(), 5);
329    }
330
331    #[test]
332    fn test_json_streaming_sink_pretty() {
333        let dir = tempdir().unwrap();
334        let path = dir.path().join("test.json");
335
336        let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
337
338        let record = TestRecord {
339            id: 1,
340            name: "test".to_string(),
341            value: 42.5,
342        };
343
344        sink.process(StreamEvent::Data(record)).unwrap();
345        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
346            .unwrap();
347
348        let content = std::fs::read_to_string(&path).unwrap();
349        // Pretty printed should have newlines and indentation
350        assert!(content.contains("\n"));
351        assert!(content.contains("  "));
352    }
353
354    #[test]
355    fn test_ndjson_streaming_sink_basic() {
356        let dir = tempdir().unwrap();
357        let path = dir.path().join("test.ndjson");
358
359        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
360
361        for i in 0..3 {
362            let record = TestRecord {
363                id: i,
364                name: format!("item_{}", i),
365                value: i as f64,
366            };
367            sink.process(StreamEvent::Data(record)).unwrap();
368        }
369        sink.close().unwrap();
370
371        // Read back and verify line by line
372        let content = std::fs::read_to_string(&path).unwrap();
373        let lines: Vec<_> = content.lines().collect();
374        assert_eq!(lines.len(), 3);
375
376        // Each line should be valid JSON
377        for (i, line) in lines.iter().enumerate() {
378            let record: TestRecord = serde_json::from_str(line).unwrap();
379            assert_eq!(record.id, i as u32);
380        }
381    }
382
383    #[test]
384    fn test_ndjson_items_processed() {
385        let dir = tempdir().unwrap();
386        let path = dir.path().join("test.ndjson");
387
388        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
389
390        for i in 0..10 {
391            let record = TestRecord {
392                id: i,
393                name: format!("item_{}", i),
394                value: i as f64,
395            };
396            sink.process(StreamEvent::Data(record)).unwrap();
397        }
398
399        assert_eq!(sink.items_processed(), 10);
400    }
401}