datasynth_output/streaming/
json_sink.rs1use std::fs::File;
6use std::io::{BufWriter, Write};
7use std::marker::PhantomData;
8use std::path::PathBuf;
9
10use serde::Serialize;
11
12use datasynth_core::error::{SynthError, SynthResult};
13use datasynth_core::traits::{StreamEvent, StreamingSink};
14
15pub struct JsonStreamingSink<T> {
29 writer: BufWriter<File>,
30 items_written: u64,
31 bytes_written: u64,
32 is_first: bool,
33 path: PathBuf,
34 pretty_print: bool,
35 _phantom: PhantomData<T>,
36}
37
38impl<T: Serialize + Send> JsonStreamingSink<T> {
39 pub fn new(path: PathBuf) -> SynthResult<Self> {
49 Self::with_options(path, false)
50 }
51
52 pub fn pretty(path: PathBuf) -> SynthResult<Self> {
54 Self::with_options(path, true)
55 }
56
57 fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
59 let file = File::create(&path)?;
60 let mut writer = BufWriter::new(file);
61
62 let opening = if pretty_print { "[\n" } else { "[" };
64 writer.write_all(opening.as_bytes())?;
65
66 Ok(Self {
67 writer,
68 items_written: 0,
69 bytes_written: opening.len() as u64,
70 is_first: true,
71 path,
72 pretty_print,
73 _phantom: PhantomData,
74 })
75 }
76
77 pub fn path(&self) -> &PathBuf {
79 &self.path
80 }
81
82 pub fn bytes_written(&self) -> u64 {
84 self.bytes_written
85 }
86
87 fn write_item(&mut self, item: &T) -> SynthResult<()> {
89 if !self.is_first {
91 let sep = if self.pretty_print { ",\n" } else { "," };
92 self.writer.write_all(sep.as_bytes())?;
93 self.bytes_written += sep.len() as u64;
94 }
95 self.is_first = false;
96
97 let json = if self.pretty_print {
99 let mut json = serde_json::to_string_pretty(item).map_err(|e| {
100 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
101 })?;
102 json = json
104 .lines()
105 .map(|line| format!(" {}", line))
106 .collect::<Vec<_>>()
107 .join("\n");
108 json
109 } else {
110 serde_json::to_string(item).map_err(|e| {
111 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
112 })?
113 };
114
115 self.writer.write_all(json.as_bytes())?;
116 self.bytes_written += json.len() as u64;
117 self.items_written += 1;
118
119 Ok(())
120 }
121
122 fn finalize(&mut self) -> SynthResult<()> {
124 let closing = if self.pretty_print { "\n]" } else { "]" };
125 self.writer.write_all(closing.as_bytes())?;
126 self.bytes_written += closing.len() as u64;
127 self.writer.flush()?;
128 Ok(())
129 }
130}
131
132impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
133 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
134 match event {
135 StreamEvent::Data(item) => {
136 self.write_item(&item)?;
137 }
138 StreamEvent::Complete(_summary) => {
139 self.finalize()?;
140 }
141 StreamEvent::BatchComplete { .. } => {
142 self.writer.flush()?;
143 }
144 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
145 }
146 Ok(())
147 }
148
149 fn flush(&mut self) -> SynthResult<()> {
150 self.writer.flush()?;
151 Ok(())
152 }
153
154 fn close(mut self) -> SynthResult<()> {
155 self.finalize()?;
156 Ok(())
157 }
158
159 fn items_processed(&self) -> u64 {
160 self.items_written
161 }
162}
163
164pub struct NdjsonStreamingSink<T> {
178 writer: BufWriter<File>,
179 items_written: u64,
180 bytes_written: u64,
181 path: PathBuf,
182 _phantom: PhantomData<T>,
183}
184
185impl<T: Serialize + Send> NdjsonStreamingSink<T> {
186 pub fn new(path: PathBuf) -> SynthResult<Self> {
196 let file = File::create(&path)?;
197 Ok(Self {
198 writer: BufWriter::new(file),
199 items_written: 0,
200 bytes_written: 0,
201 path,
202 _phantom: PhantomData,
203 })
204 }
205
206 pub fn path(&self) -> &PathBuf {
208 &self.path
209 }
210
211 pub fn bytes_written(&self) -> u64 {
213 self.bytes_written
214 }
215
216 fn write_item(&mut self, item: &T) -> SynthResult<()> {
218 let json = serde_json::to_string(item).map_err(|e| {
219 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
220 })?;
221
222 self.writer.write_all(json.as_bytes())?;
223 self.writer.write_all(b"\n")?;
224 self.bytes_written += json.len() as u64 + 1;
225 self.items_written += 1;
226
227 Ok(())
228 }
229}
230
231impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
232 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
233 match event {
234 StreamEvent::Data(item) => {
235 self.write_item(&item)?;
236 }
237 StreamEvent::Complete(_summary) => {
238 self.flush()?;
239 }
240 StreamEvent::BatchComplete { .. } => {
241 self.writer.flush()?;
242 }
243 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
244 }
245 Ok(())
246 }
247
248 fn flush(&mut self) -> SynthResult<()> {
249 self.writer.flush()?;
250 Ok(())
251 }
252
253 fn close(mut self) -> SynthResult<()> {
254 self.flush()?;
255 Ok(())
256 }
257
258 fn items_processed(&self) -> u64 {
259 self.items_written
260 }
261}
262
263#[cfg(test)]
264#[allow(clippy::unwrap_used)]
265mod tests {
266 use super::*;
267 use datasynth_core::traits::StreamSummary;
268 use serde::{Deserialize, Serialize};
269 use tempfile::tempdir;
270
271 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
272 struct TestRecord {
273 id: u32,
274 name: String,
275 value: f64,
276 }
277
278 #[test]
279 fn test_json_streaming_sink_basic() {
280 let dir = tempdir().unwrap();
281 let path = dir.path().join("test.json");
282
283 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
284
285 let record = TestRecord {
286 id: 1,
287 name: "test".to_string(),
288 value: 42.5,
289 };
290
291 sink.process(StreamEvent::Data(record)).unwrap();
292 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
293 .unwrap();
294
295 let content = std::fs::read_to_string(&path).unwrap();
297 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
298 assert_eq!(parsed.len(), 1);
299 assert_eq!(parsed[0].id, 1);
300 }
301
302 #[test]
303 fn test_json_streaming_sink_multiple_items() {
304 let dir = tempdir().unwrap();
305 let path = dir.path().join("test.json");
306
307 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
308
309 for i in 0..5 {
310 let record = TestRecord {
311 id: i,
312 name: format!("item_{}", i),
313 value: i as f64,
314 };
315 sink.process(StreamEvent::Data(record)).unwrap();
316 }
317 sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
318 .unwrap();
319
320 let content = std::fs::read_to_string(&path).unwrap();
321 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
322 assert_eq!(parsed.len(), 5);
323 }
324
325 #[test]
326 fn test_json_streaming_sink_pretty() {
327 let dir = tempdir().unwrap();
328 let path = dir.path().join("test.json");
329
330 let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
331
332 let record = TestRecord {
333 id: 1,
334 name: "test".to_string(),
335 value: 42.5,
336 };
337
338 sink.process(StreamEvent::Data(record)).unwrap();
339 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
340 .unwrap();
341
342 let content = std::fs::read_to_string(&path).unwrap();
343 assert!(content.contains("\n"));
345 assert!(content.contains(" "));
346 }
347
348 #[test]
349 fn test_ndjson_streaming_sink_basic() {
350 let dir = tempdir().unwrap();
351 let path = dir.path().join("test.ndjson");
352
353 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
354
355 for i in 0..3 {
356 let record = TestRecord {
357 id: i,
358 name: format!("item_{}", i),
359 value: i as f64,
360 };
361 sink.process(StreamEvent::Data(record)).unwrap();
362 }
363 sink.close().unwrap();
364
365 let content = std::fs::read_to_string(&path).unwrap();
367 let lines: Vec<_> = content.lines().collect();
368 assert_eq!(lines.len(), 3);
369
370 for (i, line) in lines.iter().enumerate() {
372 let record: TestRecord = serde_json::from_str(line).unwrap();
373 assert_eq!(record.id, i as u32);
374 }
375 }
376
377 #[test]
378 fn test_ndjson_items_processed() {
379 let dir = tempdir().unwrap();
380 let path = dir.path().join("test.ndjson");
381
382 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
383
384 for i in 0..10 {
385 let record = TestRecord {
386 id: i,
387 name: format!("item_{}", i),
388 value: i as f64,
389 };
390 sink.process(StreamEvent::Data(record)).unwrap();
391 }
392
393 assert_eq!(sink.items_processed(), 10);
394 }
395}