Skip to main content

datasynth_output/streaming/
json_sink.rs

1//! JSON streaming sinks for real-time data output.
2//!
3//! Provides both JSON array output and Newline-Delimited JSON (NDJSON) output.
4
5use std::fs::File;
6use std::io::{BufWriter, Write};
7use std::marker::PhantomData;
8use std::path::PathBuf;
9
10use serde::Serialize;
11
12use datasynth_core::error::{SynthError, SynthResult};
13use datasynth_core::traits::{StreamEvent, StreamingSink};
14
15/// JSON streaming sink that writes items as a JSON array.
16///
17/// The output format is a valid JSON array:
18/// ```json
19/// [
20///   { "field": "value1" },
21///   { "field": "value2" }
22/// ]
23/// ```
24///
25/// # Type Parameters
26///
27/// * `T` - The type of items to write. Must implement `Serialize`.
28pub struct JsonStreamingSink<T> {
29    writer: BufWriter<File>,
30    items_written: u64,
31    bytes_written: u64,
32    is_first: bool,
33    path: PathBuf,
34    pretty_print: bool,
35    _phantom: PhantomData<T>,
36}
37
38impl<T: Serialize + Send> JsonStreamingSink<T> {
39    /// Creates a new JSON streaming sink.
40    ///
41    /// # Arguments
42    ///
43    /// * `path` - Path to the output JSON file
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if the file cannot be created.
48    pub fn new(path: PathBuf) -> SynthResult<Self> {
49        Self::with_options(path, false)
50    }
51
52    /// Creates a JSON streaming sink with pretty printing enabled.
53    pub fn pretty(path: PathBuf) -> SynthResult<Self> {
54        Self::with_options(path, true)
55    }
56
57    /// Creates a JSON streaming sink with configurable options.
58    fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
59        let file = File::create(&path)?;
60        let mut writer = BufWriter::new(file);
61
62        // Write opening bracket
63        let opening = if pretty_print { "[\n" } else { "[" };
64        writer.write_all(opening.as_bytes())?;
65
66        Ok(Self {
67            writer,
68            items_written: 0,
69            bytes_written: opening.len() as u64,
70            is_first: true,
71            path,
72            pretty_print,
73            _phantom: PhantomData,
74        })
75    }
76
77    /// Returns the path to the output file.
78    pub fn path(&self) -> &PathBuf {
79        &self.path
80    }
81
82    /// Returns the total bytes written.
83    pub fn bytes_written(&self) -> u64 {
84        self.bytes_written
85    }
86
87    /// Writes a single item to JSON.
88    fn write_item(&mut self, item: &T) -> SynthResult<()> {
89        // Write separator
90        if !self.is_first {
91            let sep = if self.pretty_print { ",\n" } else { "," };
92            self.writer.write_all(sep.as_bytes())?;
93            self.bytes_written += sep.len() as u64;
94        }
95        self.is_first = false;
96
97        // Serialize the item
98        let json = if self.pretty_print {
99            let mut json = serde_json::to_string_pretty(item).map_err(|e| {
100                SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
101            })?;
102            // Indent each line
103            json = json
104                .lines()
105                .map(|line| format!("  {}", line))
106                .collect::<Vec<_>>()
107                .join("\n");
108            json
109        } else {
110            serde_json::to_string(item).map_err(|e| {
111                SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
112            })?
113        };
114
115        self.writer.write_all(json.as_bytes())?;
116        self.bytes_written += json.len() as u64;
117        self.items_written += 1;
118
119        Ok(())
120    }
121
122    /// Finalize the JSON array by writing the closing bracket.
123    fn finalize(&mut self) -> SynthResult<()> {
124        let closing = if self.pretty_print { "\n]" } else { "]" };
125        self.writer.write_all(closing.as_bytes())?;
126        self.bytes_written += closing.len() as u64;
127        self.writer.flush()?;
128        Ok(())
129    }
130}
131
132impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
133    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
134        match event {
135            StreamEvent::Data(item) => {
136                self.write_item(&item)?;
137            }
138            StreamEvent::Complete(_summary) => {
139                self.finalize()?;
140            }
141            StreamEvent::BatchComplete { .. } => {
142                self.writer.flush()?;
143            }
144            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
145        }
146        Ok(())
147    }
148
149    fn flush(&mut self) -> SynthResult<()> {
150        self.writer.flush()?;
151        Ok(())
152    }
153
154    fn close(mut self) -> SynthResult<()> {
155        self.finalize()?;
156        Ok(())
157    }
158
159    fn items_processed(&self) -> u64 {
160        self.items_written
161    }
162}
163
164/// Newline-Delimited JSON (NDJSON) streaming sink.
165///
166/// Each item is written as a separate JSON object on its own line:
167/// ```json
168/// {"field": "value1"}
169/// {"field": "value2"}
170/// ```
171///
172/// This format is ideal for streaming and processing line by line.
173///
174/// # Type Parameters
175///
176/// * `T` - The type of items to write. Must implement `Serialize`.
177pub struct NdjsonStreamingSink<T> {
178    writer: BufWriter<File>,
179    items_written: u64,
180    bytes_written: u64,
181    path: PathBuf,
182    _phantom: PhantomData<T>,
183}
184
185impl<T: Serialize + Send> NdjsonStreamingSink<T> {
186    /// Creates a new NDJSON streaming sink.
187    ///
188    /// # Arguments
189    ///
190    /// * `path` - Path to the output NDJSON file
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the file cannot be created.
195    pub fn new(path: PathBuf) -> SynthResult<Self> {
196        let file = File::create(&path)?;
197        Ok(Self {
198            writer: BufWriter::new(file),
199            items_written: 0,
200            bytes_written: 0,
201            path,
202            _phantom: PhantomData,
203        })
204    }
205
206    /// Returns the path to the output file.
207    pub fn path(&self) -> &PathBuf {
208        &self.path
209    }
210
211    /// Returns the total bytes written.
212    pub fn bytes_written(&self) -> u64 {
213        self.bytes_written
214    }
215
216    /// Writes a single item as a JSON line.
217    fn write_item(&mut self, item: &T) -> SynthResult<()> {
218        let json = serde_json::to_string(item).map_err(|e| {
219            SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
220        })?;
221
222        self.writer.write_all(json.as_bytes())?;
223        self.writer.write_all(b"\n")?;
224        self.bytes_written += json.len() as u64 + 1;
225        self.items_written += 1;
226
227        Ok(())
228    }
229}
230
231impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
232    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
233        match event {
234            StreamEvent::Data(item) => {
235                self.write_item(&item)?;
236            }
237            StreamEvent::Complete(_summary) => {
238                self.flush()?;
239            }
240            StreamEvent::BatchComplete { .. } => {
241                self.writer.flush()?;
242            }
243            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
244        }
245        Ok(())
246    }
247
248    fn flush(&mut self) -> SynthResult<()> {
249        self.writer.flush()?;
250        Ok(())
251    }
252
253    fn close(mut self) -> SynthResult<()> {
254        self.flush()?;
255        Ok(())
256    }
257
258    fn items_processed(&self) -> u64 {
259        self.items_written
260    }
261}
262
263#[cfg(test)]
264#[allow(clippy::unwrap_used)]
265mod tests {
266    use super::*;
267    use datasynth_core::traits::StreamSummary;
268    use serde::{Deserialize, Serialize};
269    use tempfile::tempdir;
270
271    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
272    struct TestRecord {
273        id: u32,
274        name: String,
275        value: f64,
276    }
277
278    #[test]
279    fn test_json_streaming_sink_basic() {
280        let dir = tempdir().unwrap();
281        let path = dir.path().join("test.json");
282
283        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
284
285        let record = TestRecord {
286            id: 1,
287            name: "test".to_string(),
288            value: 42.5,
289        };
290
291        sink.process(StreamEvent::Data(record)).unwrap();
292        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
293            .unwrap();
294
295        // Read back and verify it's valid JSON
296        let content = std::fs::read_to_string(&path).unwrap();
297        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
298        assert_eq!(parsed.len(), 1);
299        assert_eq!(parsed[0].id, 1);
300    }
301
302    #[test]
303    fn test_json_streaming_sink_multiple_items() {
304        let dir = tempdir().unwrap();
305        let path = dir.path().join("test.json");
306
307        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
308
309        for i in 0..5 {
310            let record = TestRecord {
311                id: i,
312                name: format!("item_{}", i),
313                value: i as f64,
314            };
315            sink.process(StreamEvent::Data(record)).unwrap();
316        }
317        sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
318            .unwrap();
319
320        let content = std::fs::read_to_string(&path).unwrap();
321        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
322        assert_eq!(parsed.len(), 5);
323    }
324
325    #[test]
326    fn test_json_streaming_sink_pretty() {
327        let dir = tempdir().unwrap();
328        let path = dir.path().join("test.json");
329
330        let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
331
332        let record = TestRecord {
333            id: 1,
334            name: "test".to_string(),
335            value: 42.5,
336        };
337
338        sink.process(StreamEvent::Data(record)).unwrap();
339        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
340            .unwrap();
341
342        let content = std::fs::read_to_string(&path).unwrap();
343        // Pretty printed should have newlines and indentation
344        assert!(content.contains("\n"));
345        assert!(content.contains("  "));
346    }
347
348    #[test]
349    fn test_ndjson_streaming_sink_basic() {
350        let dir = tempdir().unwrap();
351        let path = dir.path().join("test.ndjson");
352
353        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
354
355        for i in 0..3 {
356            let record = TestRecord {
357                id: i,
358                name: format!("item_{}", i),
359                value: i as f64,
360            };
361            sink.process(StreamEvent::Data(record)).unwrap();
362        }
363        sink.close().unwrap();
364
365        // Read back and verify line by line
366        let content = std::fs::read_to_string(&path).unwrap();
367        let lines: Vec<_> = content.lines().collect();
368        assert_eq!(lines.len(), 3);
369
370        // Each line should be valid JSON
371        for (i, line) in lines.iter().enumerate() {
372            let record: TestRecord = serde_json::from_str(line).unwrap();
373            assert_eq!(record.id, i as u32);
374        }
375    }
376
377    #[test]
378    fn test_ndjson_items_processed() {
379        let dir = tempdir().unwrap();
380        let path = dir.path().join("test.ndjson");
381
382        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
383
384        for i in 0..10 {
385            let record = TestRecord {
386                id: i,
387                name: format!("item_{}", i),
388                value: i as f64,
389            };
390            sink.process(StreamEvent::Data(record)).unwrap();
391        }
392
393        assert_eq!(sink.items_processed(), 10);
394    }
395}