use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Arc;
use datasynth_core::error::{SynthError, SynthResult};
use datasynth_core::models::JournalEntry;
use datasynth_core::traits::Sink;
use datasynth_core::{DiskSpaceGuard, DiskSpaceGuardConfig};
pub struct JsonLinesSink {
writer: BufWriter<File>,
items_written: u64,
bytes_written: u64,
disk_guard: Option<Arc<DiskSpaceGuard>>,
check_interval: u64,
}
impl JsonLinesSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
bytes_written: 0,
disk_guard: None,
check_interval: 500,
})
}
pub fn with_disk_guard(path: PathBuf, min_free_mb: usize) -> SynthResult<Self> {
let file = File::create(&path)?;
let disk_config = DiskSpaceGuardConfig::with_min_free_mb(min_free_mb).with_path(&path);
let disk_guard = Arc::new(DiskSpaceGuard::new(disk_config));
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
bytes_written: 0,
disk_guard: Some(disk_guard),
check_interval: 500,
})
}
pub fn set_disk_guard(&mut self, guard: Arc<DiskSpaceGuard>) {
self.disk_guard = Some(guard);
}
pub fn set_check_interval(&mut self, interval: u64) {
self.check_interval = interval;
}
fn check_disk_space(&self) -> SynthResult<()> {
if let Some(guard) = &self.disk_guard {
if self.items_written.is_multiple_of(self.check_interval) {
guard
.check()
.map_err(|e| SynthError::disk_exhausted(e.available_mb, e.required_mb))?;
}
}
Ok(())
}
fn record_write(&self, bytes: u64) {
if let Some(guard) = &self.disk_guard {
guard.record_write(bytes);
}
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
}
impl Sink for JsonLinesSink {
type Item = JournalEntry;
fn write(&mut self, item: Self::Item) -> SynthResult<()> {
self.check_disk_space()?;
serde_json::to_writer(&mut self.writer, &item)
.map_err(|e| SynthError::SerializationError(e.to_string()))?;
self.writer.write_all(b"\n")?;
self.bytes_written += 200; self.record_write(200);
self.items_written += 1;
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_written(&self) -> u64 {
self.items_written
}
}