use std::fs::File;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::path::PathBuf;
use serde::Serialize;
use datasynth_core::error::{SynthError, SynthResult};
use datasynth_core::traits::{StreamEvent, StreamingSink};
pub struct JsonStreamingSink<T> {
writer: BufWriter<File>,
items_written: u64,
bytes_written: u64,
is_first: bool,
path: PathBuf,
pretty_print: bool,
_phantom: PhantomData<T>,
}
impl<T: Serialize + Send> JsonStreamingSink<T> {
pub fn new(path: PathBuf) -> SynthResult<Self> {
Self::with_options(path, false)
}
pub fn pretty(path: PathBuf) -> SynthResult<Self> {
Self::with_options(path, true)
}
fn with_options(path: PathBuf, pretty_print: bool) -> SynthResult<Self> {
let file = File::create(&path)?;
let mut writer = BufWriter::with_capacity(256 * 1024, file);
let opening = if pretty_print { "[\n" } else { "[" };
writer.write_all(opening.as_bytes())?;
Ok(Self {
writer,
items_written: 0,
bytes_written: opening.len() as u64,
is_first: true,
path,
pretty_print,
_phantom: PhantomData,
})
}
pub fn path(&self) -> &PathBuf {
&self.path
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
fn write_item(&mut self, item: &T) -> SynthResult<()> {
if !self.is_first {
let sep = if self.pretty_print { ",\n" } else { "," };
self.writer.write_all(sep.as_bytes())?;
self.bytes_written += sep.len() as u64;
}
self.is_first = false;
if self.pretty_print {
let json = serde_json::to_string_pretty(item).map_err(|e| {
SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
})?;
for (i, line) in json.lines().enumerate() {
if i > 0 {
self.writer.write_all(b"\n")?;
}
self.writer.write_all(b" ")?;
self.writer.write_all(line.as_bytes())?;
}
self.bytes_written += json.len() as u64;
} else {
serde_json::to_writer(&mut self.writer, item).map_err(|e| {
SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
})?;
self.bytes_written += 100; }
self.items_written += 1;
Ok(())
}
fn finalize(&mut self) -> SynthResult<()> {
let closing = if self.pretty_print { "\n]" } else { "]" };
self.writer.write_all(closing.as_bytes())?;
self.bytes_written += closing.len() as u64;
self.writer.flush()?;
Ok(())
}
}
impl<T: Serialize + Send> StreamingSink<T> for JsonStreamingSink<T> {
fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
match event {
StreamEvent::Data(item) => {
self.write_item(&item)?;
}
StreamEvent::Complete(_summary) => {
self.finalize()?;
}
StreamEvent::BatchComplete { .. } => {
self.writer.flush()?;
}
StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
}
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.finalize()?;
Ok(())
}
fn items_processed(&self) -> u64 {
self.items_written
}
}
pub struct NdjsonStreamingSink<T> {
writer: BufWriter<File>,
items_written: u64,
bytes_written: u64,
path: PathBuf,
_phantom: PhantomData<T>,
}
impl<T: Serialize + Send> NdjsonStreamingSink<T> {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
bytes_written: 0,
path,
_phantom: PhantomData,
})
}
pub fn path(&self) -> &PathBuf {
&self.path
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
fn write_item(&mut self, item: &T) -> SynthResult<()> {
serde_json::to_writer(&mut self.writer, item).map_err(|e| {
SynthError::generation(format!("Failed to serialize item to JSON: {e}"))
})?;
self.writer.write_all(b"\n")?;
self.bytes_written += 100; self.items_written += 1;
Ok(())
}
}
impl<T: Serialize + Send> StreamingSink<T> for NdjsonStreamingSink<T> {
fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()> {
match event {
StreamEvent::Data(item) => {
self.write_item(&item)?;
}
StreamEvent::Complete(_summary) => {
self.flush()?;
}
StreamEvent::BatchComplete { .. } => {
self.writer.flush()?;
}
StreamEvent::Progress(_) | StreamEvent::Error(_) => {}
}
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_processed(&self) -> u64 {
self.items_written
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use datasynth_core::traits::StreamSummary;
use serde::{Deserialize, Serialize};
use tempfile::tempdir;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestRecord {
id: u32,
name: String,
value: f64,
}
#[test]
fn test_json_streaming_sink_basic() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.json");
let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
let record = TestRecord {
id: 1,
name: "test".to_string(),
value: 42.5,
};
sink.process(StreamEvent::Data(record)).unwrap();
sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
.unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].id, 1);
}
#[test]
fn test_json_streaming_sink_multiple_items() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.json");
let mut sink = JsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
for i in 0..5 {
let record = TestRecord {
id: i,
name: format!("item_{}", i),
value: i as f64,
};
sink.process(StreamEvent::Data(record)).unwrap();
}
sink.process(StreamEvent::Complete(StreamSummary::new(5, 100)))
.unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let parsed: Vec<TestRecord> = serde_json::from_str(&content).unwrap();
assert_eq!(parsed.len(), 5);
}
#[test]
fn test_json_streaming_sink_pretty() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.json");
let mut sink = JsonStreamingSink::<TestRecord>::pretty(path.clone()).unwrap();
let record = TestRecord {
id: 1,
name: "test".to_string(),
value: 42.5,
};
sink.process(StreamEvent::Data(record)).unwrap();
sink.process(StreamEvent::Complete(StreamSummary::new(1, 100)))
.unwrap();
let content = std::fs::read_to_string(&path).unwrap();
assert!(content.contains("\n"));
assert!(content.contains(" "));
}
#[test]
fn test_ndjson_streaming_sink_basic() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.ndjson");
let mut sink = NdjsonStreamingSink::<TestRecord>::new(path.clone()).unwrap();
for i in 0..3 {
let record = TestRecord {
id: i,
name: format!("item_{}", i),
value: i as f64,
};
sink.process(StreamEvent::Data(record)).unwrap();
}
sink.close().unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<_> = content.lines().collect();
assert_eq!(lines.len(), 3);
for (i, line) in lines.iter().enumerate() {
let record: TestRecord = serde_json::from_str(line).unwrap();
assert_eq!(record.id, i as u32);
}
}
#[test]
fn test_ndjson_items_processed() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.ndjson");
let mut sink = NdjsonStreamingSink::<TestRecord>::new(path).unwrap();
for i in 0..10 {
let record = TestRecord {
id: i,
name: format!("item_{}", i),
value: i as f64,
};
sink.process(StreamEvent::Data(record)).unwrap();
}
assert_eq!(sink.items_processed(), 10);
}
}