Skip to main content

datasynth_output/streaming/
json_sink.rs

1//! JSON streaming sinks for real-time data output.
2//!
3//! Provides both JSON array output and Newline-Delimited JSON (NDJSON) output.
4
5use std::fs::File;
6use std::io::{BufWriter, Write};
7use std::marker::PhantomData;
8use std::path::PathBuf;
9
10use serde::Serialize;
11
12use datasynth_core::error::{SynthError, SynthResult};
13use datasynth_core::traits::{StreamEvent, StreamingSink};
14
15/// JSON streaming sink that writes items as a JSON array.
16///
17/// The output format is a valid JSON array:
18/// ```json
19/// [
20///   { "field": "value1" },
21///   { "field": "value2" }
22/// ]
23/// ```
24///
25/// # Type Parameters
26///
27/// * `T` - The type of items to write. Must implement `Serialize`.
28pub struct JsonStreamingSink<T> {
29    writer: BufWriter<File>,
30    items_written: u64,
31    bytes_written: u64,
32    is_first: bool,
33    path: PathBuf,
34    pretty_print: bool,
35    _phantom: PhantomData<T>,
36}
37
38impl<T: Serialize + Send> JsonStreamingSink<T> {
39    /// Creates a new JSON streaming sink.
40    ///
41    /// # Arguments
42    ///
43    /// * `path` - Path to the output JSON file
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if the file cannot be created.
48    pub fn new(path: PathBuf) -> SynthResult<Self> {
49        Self::with_options(path, false)
50    }
51
52    /// Creates a JSON streaming sink with pretty printing enabled.
53    pub fn pretty(path: PathBuf) -> SynthResult<Self> {
54        Self::with_options(path, true)
55    }
56
57    /// Creates a JSON streaming sink with configurable options.
58    fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
59        let file = File::create(&path)?;
60        let mut writer = BufWriter::new(file);
61
62        // Write opening bracket
63        let opening = if pretty_print { "[\n" } else { "[" };
64        writer.write_all(opening.as_bytes())?;
65
66        Ok(Self {
67            writer,
68            items_written: 0,
69            bytes_written: opening.len() as u64,
70            is_first: true,
71            path,
72            pretty_print,
73            _phantom: PhantomData,
74        })
75    }
76
77    /// Returns the path to the output file.
78    pub fn path(&self) -> &PathBuf {
79        &self.path
80    }
81
82    /// Returns the total bytes written.
83    pub fn bytes_written(&self) -> u64 {
84        self.bytes_written
85    }
86
87    /// Writes a single item to JSON.
88    fn write_item(&mut self, item: &T) -> SynthResult<()> {
89        // Write separator
90        if !self.is_first {
91            let sep = if self.pretty_print { ",\n" } else { "," };
92            self.writer.write_all(sep.as_bytes())?;
93            self.bytes_written += sep.len() as u64;
94        }
95        self.is_first = false;
96
97        // Serialize the item
98        let json = if self.pretty_print {
99            let mut json = serde_json::to_string_pretty(item).map_err(|e| {
100                SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
101            })?;
102            // Indent each line
103            json = json
104                .lines()
105                .map(|line| format!("  {}", line))
106                .collect::<Vec<_>>()
107                .join("\n");
108            json
109        } else {
110            serde_json::to_string(item).map_err(|e| {
111                SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
112            })?
113        };
114
115        self.writer.write_all(json.as_bytes())?;
116        self.bytes_written += json.len() as u64;
117        self.items_written += 1;
118
119        Ok(())
120    }
121
122    /// Finalize the JSON array by writing the closing bracket.
123    fn finalize(&mut self) -> SynthResult<()> {
124        let closing = if self.pretty_print { "\n]" } else { "]" };
125        self.writer.write_all(closing.as_bytes())?;
126        self.bytes_written += closing.len() as u64;
127        self.writer.flush()?;
128        Ok(())
129    }
130}
131
132impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
133    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
134        match event {
135            StreamEvent::Data(item) => {
136                self.write_item(&item)?;
137            }
138            StreamEvent::Complete(_summary) => {
139                self.finalize()?;
140            }
141            StreamEvent::BatchComplete { .. } => {
142                self.writer.flush()?;
143            }
144            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
145        }
146        Ok(())
147    }
148
149    fn flush(&mut self) -> SynthResult<()> {
150        self.writer.flush()?;
151        Ok(())
152    }
153
154    fn close(mut self) -> SynthResult<()> {
155        self.finalize()?;
156        Ok(())
157    }
158
159    fn items_processed(&self) -> u64 {
160        self.items_written
161    }
162}
163
164/// Newline-Delimited JSON (NDJSON) streaming sink.
165///
166/// Each item is written as a separate JSON object on its own line:
167/// ```json
168/// {"field": "value1"}
169/// {"field": "value2"}
170/// ```
171///
172/// This format is ideal for streaming and processing line by line.
173///
174/// # Type Parameters
175///
176/// * `T` - The type of items to write. Must implement `Serialize`.
177pub struct NdjsonStreamingSink<T> {
178    writer: BufWriter<File>,
179    items_written: u64,
180    bytes_written: u64,
181    path: PathBuf,
182    _phantom: PhantomData<T>,
183}
184
185impl<T: Serialize + Send> NdjsonStreamingSink<T> {
186    /// Creates a new NDJSON streaming sink.
187    ///
188    /// # Arguments
189    ///
190    /// * `path` - Path to the output NDJSON file
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the file cannot be created.
195    pub fn new(path: PathBuf) -> SynthResult<Self> {
196        let file = File::create(&path)?;
197        Ok(Self {
198            writer: BufWriter::new(file),
199            items_written: 0,
200            bytes_written: 0,
201            path,
202            _phantom: PhantomData,
203        })
204    }
205
206    /// Returns the path to the output file.
207    pub fn path(&self) -> &PathBuf {
208        &self.path
209    }
210
211    /// Returns the total bytes written.
212    pub fn bytes_written(&self) -> u64 {
213        self.bytes_written
214    }
215
216    /// Writes a single item as a JSON line.
217    fn write_item(&mut self, item: &T) -> SynthResult<()> {
218        let json = serde_json::to_string(item).map_err(|e| {
219            SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
220        })?;
221
222        self.writer.write_all(json.as_bytes())?;
223        self.writer.write_all(b"\n")?;
224        self.bytes_written += json.len() as u64 + 1;
225        self.items_written += 1;
226
227        Ok(())
228    }
229}
230
231impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
232    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
233        match event {
234            StreamEvent::Data(item) => {
235                self.write_item(&item)?;
236            }
237            StreamEvent::Complete(_summary) => {
238                self.flush()?;
239            }
240            StreamEvent::BatchComplete { .. } => {
241                self.writer.flush()?;
242            }
243            StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
244        }
245        Ok(())
246    }
247
248    fn flush(&mut self) -> SynthResult<()> {
249        self.writer.flush()?;
250        Ok(())
251    }
252
253    fn close(mut self) -> SynthResult<()> {
254        self.flush()?;
255        Ok(())
256    }
257
258    fn items_processed(&self) -> u64 {
259        self.items_written
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use datasynth_core::traits::StreamSummary;
267    use serde::{Deserialize, Serialize};
268    use tempfile::tempdir;
269
270    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
271    struct TestRecord {
272        id: u32,
273        name: String,
274        value: f64,
275    }
276
277    #[test]
278    fn test_json_streaming_sink_basic() {
279        let dir = tempdir().unwrap();
280        let path = dir.path().join("test.json");
281
282        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
283
284        let record = TestRecord {
285            id: 1,
286            name: "test".to_string(),
287            value: 42.5,
288        };
289
290        sink.process(StreamEvent::Data(record)).unwrap();
291        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
292            .unwrap();
293
294        // Read back and verify it's valid JSON
295        let content = std::fs::read_to_string(&path).unwrap();
296        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
297        assert_eq!(parsed.len(), 1);
298        assert_eq!(parsed[0].id, 1);
299    }
300
301    #[test]
302    fn test_json_streaming_sink_multiple_items() {
303        let dir = tempdir().unwrap();
304        let path = dir.path().join("test.json");
305
306        let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
307
308        for i in 0..5 {
309            let record = TestRecord {
310                id: i,
311                name: format!("item_{}", i),
312                value: i as f64,
313            };
314            sink.process(StreamEvent::Data(record)).unwrap();
315        }
316        sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
317            .unwrap();
318
319        let content = std::fs::read_to_string(&path).unwrap();
320        let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
321        assert_eq!(parsed.len(), 5);
322    }
323
324    #[test]
325    fn test_json_streaming_sink_pretty() {
326        let dir = tempdir().unwrap();
327        let path = dir.path().join("test.json");
328
329        let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
330
331        let record = TestRecord {
332            id: 1,
333            name: "test".to_string(),
334            value: 42.5,
335        };
336
337        sink.process(StreamEvent::Data(record)).unwrap();
338        sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
339            .unwrap();
340
341        let content = std::fs::read_to_string(&path).unwrap();
342        // Pretty printed should have newlines and indentation
343        assert!(content.contains("\n"));
344        assert!(content.contains("  "));
345    }
346
347    #[test]
348    fn test_ndjson_streaming_sink_basic() {
349        let dir = tempdir().unwrap();
350        let path = dir.path().join("test.ndjson");
351
352        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
353
354        for i in 0..3 {
355            let record = TestRecord {
356                id: i,
357                name: format!("item_{}", i),
358                value: i as f64,
359            };
360            sink.process(StreamEvent::Data(record)).unwrap();
361        }
362        sink.close().unwrap();
363
364        // Read back and verify line by line
365        let content = std::fs::read_to_string(&path).unwrap();
366        let lines: Vec<_> = content.lines().collect();
367        assert_eq!(lines.len(), 3);
368
369        // Each line should be valid JSON
370        for (i, line) in lines.iter().enumerate() {
371            let record: TestRecord = serde_json::from_str(line).unwrap();
372            assert_eq!(record.id, i as u32);
373        }
374    }
375
376    #[test]
377    fn test_ndjson_items_processed() {
378        let dir = tempdir().unwrap();
379        let path = dir.path().join("test.ndjson");
380
381        let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
382
383        for i in 0..10 {
384            let record = TestRecord {
385                id: i,
386                name: format!("item_{}", i),
387                value: i as f64,
388            };
389            sink.process(StreamEvent::Data(record)).unwrap();
390        }
391
392        assert_eq!(sink.items_processed(), 10);
393    }
394}