datasynth_output/streaming/
json_sink.rs1use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::marker::PhantomData;
10use std::path::PathBuf;
11
12use serde::Serialize;
13
14use datasynth_core::error::{SynthError, SynthResult};
15use datasynth_core::traits::{StreamEvent, StreamingSink};
16
17pub struct JsonStreamingSink<T> {
31 writer: BufWriter<File>,
32 items_written: u64,
33 bytes_written: u64,
34 is_first: bool,
35 path: PathBuf,
36 pretty_print: bool,
37 _phantom: PhantomData<T>,
38}
39
40impl<T: Serialize + Send> JsonStreamingSink<T> {
41 pub fn new(path: PathBuf) -> SynthResult<Self> {
51 Self::with_options(path, false)
52 }
53
54 pub fn pretty(path: PathBuf) -> SynthResult<Self> {
56 Self::with_options(path, true)
57 }
58
59 fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
61 let file = File::create(&path)?;
62 let mut writer = BufWriter::with_capacity(256 * 1024, file);
63
64 let opening = if pretty_print { "[\n" } else { "[" };
66 writer.write_all(opening.as_bytes())?;
67
68 Ok(Self {
69 writer,
70 items_written: 0,
71 bytes_written: opening.len() as u64,
72 is_first: true,
73 path,
74 pretty_print,
75 _phantom: PhantomData,
76 })
77 }
78
79 pub fn path(&self) -> &PathBuf {
81 &self.path
82 }
83
84 pub fn bytes_written(&self) -> u64 {
86 self.bytes_written
87 }
88
89 fn write_item(&mut self, item: &T) -> SynthResult<()> {
94 if !self.is_first {
96 let sep = if self.pretty_print { ",\n" } else { "," };
97 self.writer.write_all(sep.as_bytes())?;
98 self.bytes_written += sep.len() as u64;
99 }
100 self.is_first = false;
101
102 if self.pretty_print {
103 let json = serde_json::to_string_pretty(item).map_err(|e| {
105 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
106 })?;
107 for (i, line) in json.lines().enumerate() {
109 if i > 0 {
110 self.writer.write_all(b"\n")?;
111 }
112 self.writer.write_all(b" ")?;
113 self.writer.write_all(line.as_bytes())?;
114 }
115 self.bytes_written += json.len() as u64;
116 } else {
117 serde_json::to_writer(&mut self.writer, item).map_err(|e| {
119 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
120 })?;
121 self.bytes_written += 100; }
123
124 self.items_written += 1;
125 Ok(())
126 }
127
128 fn finalize(&mut self) -> SynthResult<()> {
130 let closing = if self.pretty_print { "\n]" } else { "]" };
131 self.writer.write_all(closing.as_bytes())?;
132 self.bytes_written += closing.len() as u64;
133 self.writer.flush()?;
134 Ok(())
135 }
136}
137
138impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
139 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
140 match event {
141 StreamEvent::Data(item) => {
142 self.write_item(&item)?;
143 }
144 StreamEvent::Complete(_summary) => {
145 self.finalize()?;
146 }
147 StreamEvent::BatchComplete { .. } => {
148 self.writer.flush()?;
149 }
150 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
151 }
152 Ok(())
153 }
154
155 fn flush(&mut self) -> SynthResult<()> {
156 self.writer.flush()?;
157 Ok(())
158 }
159
160 fn close(mut self) -> SynthResult<()> {
161 self.finalize()?;
162 Ok(())
163 }
164
165 fn items_processed(&self) -> u64 {
166 self.items_written
167 }
168}
169
170pub struct NdjsonStreamingSink<T> {
184 writer: BufWriter<File>,
185 items_written: u64,
186 bytes_written: u64,
187 path: PathBuf,
188 _phantom: PhantomData<T>,
189}
190
191impl<T: Serialize + Send> NdjsonStreamingSink<T> {
192 pub fn new(path: PathBuf) -> SynthResult<Self> {
202 let file = File::create(&path)?;
203 Ok(Self {
204 writer: BufWriter::with_capacity(256 * 1024, file),
205 items_written: 0,
206 bytes_written: 0,
207 path,
208 _phantom: PhantomData,
209 })
210 }
211
212 pub fn path(&self) -> &PathBuf {
214 &self.path
215 }
216
217 pub fn bytes_written(&self) -> u64 {
219 self.bytes_written
220 }
221
222 fn write_item(&mut self, item: &T) -> SynthResult<()> {
226 serde_json::to_writer(&mut self.writer, item).map_err(|e| {
227 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
228 })?;
229
230 self.writer.write_all(b"\n")?;
231 self.bytes_written += 100; self.items_written += 1;
233
234 Ok(())
235 }
236}
237
238impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
239 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
240 match event {
241 StreamEvent::Data(item) => {
242 self.write_item(&item)?;
243 }
244 StreamEvent::Complete(_summary) => {
245 self.flush()?;
246 }
247 StreamEvent::BatchComplete { .. } => {
248 self.writer.flush()?;
249 }
250 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
251 }
252 Ok(())
253 }
254
255 fn flush(&mut self) -> SynthResult<()> {
256 self.writer.flush()?;
257 Ok(())
258 }
259
260 fn close(mut self) -> SynthResult<()> {
261 self.flush()?;
262 Ok(())
263 }
264
265 fn items_processed(&self) -> u64 {
266 self.items_written
267 }
268}
269
270#[cfg(test)]
271#[allow(clippy::unwrap_used)]
272mod tests {
273 use super::*;
274 use datasynth_core::traits::StreamSummary;
275 use serde::{Deserialize, Serialize};
276 use tempfile::tempdir;
277
278 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
279 struct TestRecord {
280 id: u32,
281 name: String,
282 value: f64,
283 }
284
285 #[test]
286 fn test_json_streaming_sink_basic() {
287 let dir = tempdir().unwrap();
288 let path = dir.path().join("test.json");
289
290 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
291
292 let record = TestRecord {
293 id: 1,
294 name: "test".to_string(),
295 value: 42.5,
296 };
297
298 sink.process(StreamEvent::Data(record)).unwrap();
299 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
300 .unwrap();
301
302 let content = std::fs::read_to_string(&path).unwrap();
304 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
305 assert_eq!(parsed.len(), 1);
306 assert_eq!(parsed[0].id, 1);
307 }
308
309 #[test]
310 fn test_json_streaming_sink_multiple_items() {
311 let dir = tempdir().unwrap();
312 let path = dir.path().join("test.json");
313
314 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
315
316 for i in 0..5 {
317 let record = TestRecord {
318 id: i,
319 name: format!("item_{}", i),
320 value: i as f64,
321 };
322 sink.process(StreamEvent::Data(record)).unwrap();
323 }
324 sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
325 .unwrap();
326
327 let content = std::fs::read_to_string(&path).unwrap();
328 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
329 assert_eq!(parsed.len(), 5);
330 }
331
332 #[test]
333 fn test_json_streaming_sink_pretty() {
334 let dir = tempdir().unwrap();
335 let path = dir.path().join("test.json");
336
337 let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
338
339 let record = TestRecord {
340 id: 1,
341 name: "test".to_string(),
342 value: 42.5,
343 };
344
345 sink.process(StreamEvent::Data(record)).unwrap();
346 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
347 .unwrap();
348
349 let content = std::fs::read_to_string(&path).unwrap();
350 assert!(content.contains("\n"));
352 assert!(content.contains(" "));
353 }
354
355 #[test]
356 fn test_ndjson_streaming_sink_basic() {
357 let dir = tempdir().unwrap();
358 let path = dir.path().join("test.ndjson");
359
360 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
361
362 for i in 0..3 {
363 let record = TestRecord {
364 id: i,
365 name: format!("item_{}", i),
366 value: i as f64,
367 };
368 sink.process(StreamEvent::Data(record)).unwrap();
369 }
370 sink.close().unwrap();
371
372 let content = std::fs::read_to_string(&path).unwrap();
374 let lines: Vec<_> = content.lines().collect();
375 assert_eq!(lines.len(), 3);
376
377 for (i, line) in lines.iter().enumerate() {
379 let record: TestRecord = serde_json::from_str(line).unwrap();
380 assert_eq!(record.id, i as u32);
381 }
382 }
383
384 #[test]
385 fn test_ndjson_items_processed() {
386 let dir = tempdir().unwrap();
387 let path = dir.path().join("test.ndjson");
388
389 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
390
391 for i in 0..10 {
392 let record = TestRecord {
393 id: i,
394 name: format!("item_{}", i),
395 value: i as f64,
396 };
397 sink.process(StreamEvent::Data(record)).unwrap();
398 }
399
400 assert_eq!(sink.items_processed(), 10);
401 }
402}