datasynth_output/streaming/
json_sink.rs1use std::fs::File;
6use std::io::{BufWriter, Write};
7use std::marker::PhantomData;
8use std::path::PathBuf;
9
10use serde::Serialize;
11
12use datasynth_core::error::{SynthError, SynthResult};
13use datasynth_core::traits::{StreamEvent, StreamingSink};
14
15pub struct JsonStreamingSink<T> {
29 writer: BufWriter<File>,
30 items_written: u64,
31 bytes_written: u64,
32 is_first: bool,
33 path: PathBuf,
34 pretty_print: bool,
35 _phantom: PhantomData<T>,
36}
37
38impl<T: Serialize + Send> JsonStreamingSink<T> {
39 pub fn new(path: PathBuf) -> SynthResult<Self> {
49 Self::with_options(path, false)
50 }
51
52 pub fn pretty(path: PathBuf) -> SynthResult<Self> {
54 Self::with_options(path, true)
55 }
56
57 fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
59 let file = File::create(&path)?;
60 let mut writer = BufWriter::new(file);
61
62 let opening = if pretty_print { "[\n" } else { "[" };
64 writer.write_all(opening.as_bytes())?;
65
66 Ok(Self {
67 writer,
68 items_written: 0,
69 bytes_written: opening.len() as u64,
70 is_first: true,
71 path,
72 pretty_print,
73 _phantom: PhantomData,
74 })
75 }
76
77 pub fn path(&self) -> &PathBuf {
79 &self.path
80 }
81
82 pub fn bytes_written(&self) -> u64 {
84 self.bytes_written
85 }
86
87 fn write_item(&mut self, item: &T) -> SynthResult<()> {
89 if !self.is_first {
91 let sep = if self.pretty_print { ",\n" } else { "," };
92 self.writer.write_all(sep.as_bytes())?;
93 self.bytes_written += sep.len() as u64;
94 }
95 self.is_first = false;
96
97 let json = if self.pretty_print {
99 let mut json = serde_json::to_string_pretty(item).map_err(|e| {
100 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
101 })?;
102 json = json
104 .lines()
105 .map(|line| format!(" {}", line))
106 .collect::<Vec<_>>()
107 .join("\n");
108 json
109 } else {
110 serde_json::to_string(item).map_err(|e| {
111 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
112 })?
113 };
114
115 self.writer.write_all(json.as_bytes())?;
116 self.bytes_written += json.len() as u64;
117 self.items_written += 1;
118
119 Ok(())
120 }
121
122 fn finalize(&mut self) -> SynthResult<()> {
124 let closing = if self.pretty_print { "\n]" } else { "]" };
125 self.writer.write_all(closing.as_bytes())?;
126 self.bytes_written += closing.len() as u64;
127 self.writer.flush()?;
128 Ok(())
129 }
130}
131
132impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
133 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
134 match event {
135 StreamEvent::Data(item) => {
136 self.write_item(&item)?;
137 }
138 StreamEvent::Complete(_summary) => {
139 self.finalize()?;
140 }
141 StreamEvent::BatchComplete { .. } => {
142 self.writer.flush()?;
143 }
144 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
145 }
146 Ok(())
147 }
148
149 fn flush(&mut self) -> SynthResult<()> {
150 self.writer.flush()?;
151 Ok(())
152 }
153
154 fn close(mut self) -> SynthResult<()> {
155 self.finalize()?;
156 Ok(())
157 }
158
159 fn items_processed(&self) -> u64 {
160 self.items_written
161 }
162}
163
164pub struct NdjsonStreamingSink<T> {
178 writer: BufWriter<File>,
179 items_written: u64,
180 bytes_written: u64,
181 path: PathBuf,
182 _phantom: PhantomData<T>,
183}
184
185impl<T: Serialize + Send> NdjsonStreamingSink<T> {
186 pub fn new(path: PathBuf) -> SynthResult<Self> {
196 let file = File::create(&path)?;
197 Ok(Self {
198 writer: BufWriter::new(file),
199 items_written: 0,
200 bytes_written: 0,
201 path,
202 _phantom: PhantomData,
203 })
204 }
205
206 pub fn path(&self) -> &PathBuf {
208 &self.path
209 }
210
211 pub fn bytes_written(&self) -> u64 {
213 self.bytes_written
214 }
215
216 fn write_item(&mut self, item: &T) -> SynthResult<()> {
218 let json = serde_json::to_string(item).map_err(|e| {
219 SynthError::generation(format!("Failed to serialize item to JSON: {}", e))
220 })?;
221
222 self.writer.write_all(json.as_bytes())?;
223 self.writer.write_all(b"\n")?;
224 self.bytes_written += json.len() as u64 + 1;
225 self.items_written += 1;
226
227 Ok(())
228 }
229}
230
231impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
232 fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
233 match event {
234 StreamEvent::Data(item) => {
235 self.write_item(&item)?;
236 }
237 StreamEvent::Complete(_summary) => {
238 self.flush()?;
239 }
240 StreamEvent::BatchComplete { .. } => {
241 self.writer.flush()?;
242 }
243 StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
244 }
245 Ok(())
246 }
247
248 fn flush(&mut self) -> SynthResult<()> {
249 self.writer.flush()?;
250 Ok(())
251 }
252
253 fn close(mut self) -> SynthResult<()> {
254 self.flush()?;
255 Ok(())
256 }
257
258 fn items_processed(&self) -> u64 {
259 self.items_written
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use datasynth_core::traits::StreamSummary;
267 use serde::{Deserialize, Serialize};
268 use tempfile::tempdir;
269
270 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
271 struct TestRecord {
272 id: u32,
273 name: String,
274 value: f64,
275 }
276
277 #[test]
278 fn test_json_streaming_sink_basic() {
279 let dir = tempdir().unwrap();
280 let path = dir.path().join("test.json");
281
282 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
283
284 let record = TestRecord {
285 id: 1,
286 name: "test".to_string(),
287 value: 42.5,
288 };
289
290 sink.process(StreamEvent::Data(record)).unwrap();
291 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
292 .unwrap();
293
294 let content = std::fs::read_to_string(&path).unwrap();
296 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
297 assert_eq!(parsed.len(), 1);
298 assert_eq!(parsed[0].id, 1);
299 }
300
301 #[test]
302 fn test_json_streaming_sink_multiple_items() {
303 let dir = tempdir().unwrap();
304 let path = dir.path().join("test.json");
305
306 let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
307
308 for i in 0..5 {
309 let record = TestRecord {
310 id: i,
311 name: format!("item_{}", i),
312 value: i as f64,
313 };
314 sink.process(StreamEvent::Data(record)).unwrap();
315 }
316 sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
317 .unwrap();
318
319 let content = std::fs::read_to_string(&path).unwrap();
320 let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
321 assert_eq!(parsed.len(), 5);
322 }
323
324 #[test]
325 fn test_json_streaming_sink_pretty() {
326 let dir = tempdir().unwrap();
327 let path = dir.path().join("test.json");
328
329 let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
330
331 let record = TestRecord {
332 id: 1,
333 name: "test".to_string(),
334 value: 42.5,
335 };
336
337 sink.process(StreamEvent::Data(record)).unwrap();
338 sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
339 .unwrap();
340
341 let content = std::fs::read_to_string(&path).unwrap();
342 assert!(content.contains("\n"));
344 assert!(content.contains(" "));
345 }
346
347 #[test]
348 fn test_ndjson_streaming_sink_basic() {
349 let dir = tempdir().unwrap();
350 let path = dir.path().join("test.ndjson");
351
352 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
353
354 for i in 0..3 {
355 let record = TestRecord {
356 id: i,
357 name: format!("item_{}", i),
358 value: i as f64,
359 };
360 sink.process(StreamEvent::Data(record)).unwrap();
361 }
362 sink.close().unwrap();
363
364 let content = std::fs::read_to_string(&path).unwrap();
366 let lines: Vec<_> = content.lines().collect();
367 assert_eq!(lines.len(), 3);
368
369 for (i, line) in lines.iter().enumerate() {
371 let record: TestRecord = serde_json::from_str(line).unwrap();
372 assert_eq!(record.id, i as u32);
373 }
374 }
375
376 #[test]
377 fn test_ndjson_items_processed() {
378 let dir = tempdir().unwrap();
379 let path = dir.path().join("test.ndjson");
380
381 let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
382
383 for i in 0..10 {
384 let record = TestRecord {
385 id: i,
386 name: format!("item_{}", i),
387 value: i as f64,
388 };
389 sink.process(StreamEvent::Data(record)).unwrap();
390 }
391
392 assert_eq!(sink.items_processed(), 10);
393 }
394}