datasynth_output/streaming/
json_sink.rs1use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17pub struct JsonStreamingSink<T> {
31 writer: BufWriter<File>,
32 items_written: u64,
33 bytes_written: u64,
34 is_first: bool,
35 path: PathBuf,
36 pretty_print: bool,
37 _phantom: PhantomData<T>,
38}
39
40impl<T: Serialize + Send> JsonStreamingSink<T> {
41 pub fn new(path: PathBuf) -> SynthResult<Self> {
51 Self::with_options(path, false)
52 }
53
54 pub fn pretty(path: PathBuf) -> SynthResult<Self> {
56 Self::with_options(path, true)
57 }
58
59 fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
61 let file = File::create(&path)?;
62 let mut writer = BufWriter::with_capacity(256 * 1024, file);
63
64 let opening = if pretty_print { "[\n" } else { "[" };
66 writer.write_all(opening.as_bytes())?;
67
68 Ok(Self {
69 writer,
70 items_written: 0,
71 bytes_written: opening.len() as u64,
72 is_first: true,
73 path,
74 pretty_print,
75 _phantom: PhantomData,
76 })
77 }
78
79 pub fn path(&self) -> &PathBuf {
81 &self.path
82 }
83
84 pub fn bytes_written(&self) -> u64 {
86 self.bytes_written
87 }
88
89 fn write_item(&mut self, item: &T) -> SynthResult<()> {
94 if !self.is_first {
96 let sep = if self.pretty_print { ",\n" } else { "," };
97 self.writer.write_all(sep.as_bytes())?;
98 self.bytes_written += sep.len() as u64;
99 }
100 self.is_first = false;
101
102 if self.pretty_print {
103 let json = serde_json::to_string_pretty(item).map_err(|e| {
105 SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
106 })?;
107 for (i, line) in json.lines().enumerate() {
109 if i > 0 {
110 self.writer.write_all(b"\n")?;
111 }
112 self.writer.write_all(b" ")?;
113 self.writer.write_all(line.as_bytes())?;
114 }
115 self.bytes_written += json.len() as u64;
116 } else {
117 serde_json::to_writer(&mut self.writer, item).map_err(|e| {
119 SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
120 })?;
121 self.bytes_written += 100; }
123
124 self.items_written += 1;
125 Ok(())
126 }
127
128 fn finalize(&mut self) -> SynthResult<()> {
130 let closing = if self.pretty_print { "\n]" } else { "]" };
131 self.writer.write_all(closing.as_bytes())?;
132 self.bytes_written += closing.len() as u64;
133 self.writer.flush()?;
134 Ok(())
135 }
136}
137
138impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
139 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
140 match event {
141 StreamEvent::Data(item) => {
142 self.write_item(&item)?;
143 }
144 StreamEvent::Complete(_summary) => {
145 self.finalize()?;
146 }
147 StreamEvent::BatchComplete { .. } => {
148 self.writer.flush()?;
149 }
150 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
151 }
152 Ok(())
153 }
154
155 fn flush(&mut self) -> SynthResult<()> {
156 self.writer.flush()?;
157 Ok(())
158 }
159
160 fn close(mut self) -> SynthResult<()> {
161 self.finalize()?;
162 Ok(())
163 }
164
165 fn items_processed(&self) -> u64 {
166 self.items_written
167 }
168}
169
170pub struct NdjsonStreamingSink<T> {
184 writer: BufWriter<File>,
185 items_written: u64,
186 bytes_written: u64,
187 path: PathBuf,
188 _phantom: PhantomData<T>,
189}
190
191impl<T: Serialize + Send> NdjsonStreamingSink<T> {
192 pub fn new(path: PathBuf) -> SynthResult<Self> {
202 let file = File::create(&path)?;
203 Ok(Self {
204 writer: BufWriter::with_capacity(256 * 1024, file),
205 items_written: 0,
206 bytes_written: 0,
207 path,
208 _phantom: PhantomData,
209 })
210 }
211
212 pub fn path(&self) -> &PathBuf {
214 &self.path
215 }
216
217 pub fn bytes_written(&self) -> u64 {
219 self.bytes_written
220 }
221
222 fn write_item(&mut self, item: &T) -> SynthResult<()> {
226 serde_json::to_writer(&mut self.writer, item).map_err(|e| {
227 SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
228 })?;
229
230 self.writer.write_all(b"\n")?;
231 self.bytes_written += 100; self.items_written += 1;
233
234 Ok(())
235 }
236}
237
238impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
239 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
240 match event {
241 StreamEvent::Data(item) => {
242 self.write_item(&item)?;
243 }
244 StreamEvent::Complete(_summary) => {
245 self.flush()?;
246 }
247 StreamEvent::BatchComplete { .. } => {
248 self.writer.flush()?;
249 }
250 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
251 }
252 Ok(())
253 }
254
255 fn flush(&mut self) -> SynthResult<()> {
256 self.writer.flush()?;
257 Ok(())
258 }
259
260 fn close(mut self) -> SynthResult<()> {
261 self.flush()?;
262 Ok(())
263 }
264
265 fn items_processed(&self) -> u64 {
266 self.items_written
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use datasynth_core::traits::StreamSummary;
274 use serde::{Deserialize, Serialize};
275 use tempfile::tempdir;
276
277 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
278 struct TestRecord {
279 id: u32,
280 name: String,
281 value: f64,
282 }
283
284 #[test]
285 fn test_json_streaming_sink_basic() {
286 let dir = tempdir().unwrap();
287 let path = dir.path().join("test.json");
288
289 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
290
291 let record = TestRecord {
292 id: 1,
293 name: "test".to_string(),
294 value: 42.5,
295 };
296
297 sink.process(StreamEvent::Data(record)).unwrap();
298 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
299 .unwrap();
300
301 let content = std::fs::read_to_string(&path).unwrap();
303 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
304 assert_eq!(parsed.len(), 1);
305 assert_eq!(parsed[0].id, 1);
306 }
307
308 #[test]
309 fn test_json_streaming_sink_multiple_items() {
310 let dir = tempdir().unwrap();
311 let path = dir.path().join("test.json");
312
313 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
314
315 for i in 0..5 {
316 let record = TestRecord {
317 id: i,
318 name: format!("item_{}", i),
319 value: i as f64,
320 };
321 sink.process(StreamEvent::Data(record)).unwrap();
322 }
323 sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
324 .unwrap();
325
326 let content = std::fs::read_to_string(&path).unwrap();
327 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
328 assert_eq!(parsed.len(), 5);
329 }
330
331 #[test]
332 fn test_json_streaming_sink_pretty() {
333 let dir = tempdir().unwrap();
334 let path = dir.path().join("test.json");
335
336 let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
337
338 let record = TestRecord {
339 id: 1,
340 name: "test".to_string(),
341 value: 42.5,
342 };
343
344 sink.process(StreamEvent::Data(record)).unwrap();
345 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
346 .unwrap();
347
348 let content = std::fs::read_to_string(&path).unwrap();
349 assert!(content.contains("\n"));
351 assert!(content.contains(" "));
352 }
353
354 #[test]
355 fn test_ndjson_streaming_sink_basic() {
356 let dir = tempdir().unwrap();
357 let path = dir.path().join("test.ndjson");
358
359 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
360
361 for i in 0..3 {
362 let record = TestRecord {
363 id: i,
364 name: format!("item_{}", i),
365 value: i as f64,
366 };
367 sink.process(StreamEvent::Data(record)).unwrap();
368 }
369 sink.close().unwrap();
370
371 let content = std::fs::read_to_string(&path).unwrap();
373 let lines: Vec<_> = content.lines().collect();
374 assert_eq!(lines.len(), 3);
375
376 for (i, line) in lines.iter().enumerate() {
378 let record: TestRecord = serde_json::from_str(line).unwrap();
379 assert_eq!(record.id, i as u32);
380 }
381 }
382
383 #[test]
384 fn test_ndjson_items_processed() {
385 let dir = tempdir().unwrap();
386 let path = dir.path().join("test.ndjson");
387
388 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
389
390 for i in 0..10 {
391 let record = TestRecord {
392 id: i,
393 name: format!("item_{}", i),
394 value: i as f64,
395 };
396 sink.process(StreamEvent::Data(record)).unwrap();
397 }
398
399 assert_eq!(sink.items_processed(), 10);
400 }
401}