use std::path::{Path, PathBuf};
use tokio::io::{AsyncWriteExt, BufWriter};
use crate::core::{Error, Event, Result};
use crate::sink::SinkAdapter;
const BATCH_MAX_LINES: usize = 128;
const BATCH_MAX_BYTES: usize = 256 * 1024; const WRITE_BUF_CAPACITY: usize = 64 * 1024;
#[derive(Debug, Clone)]
pub struct FileJsonlSinkConfig {
pub rotate_size_bytes: u64,
pub fsync_every: u32,
}
impl Default for FileJsonlSinkConfig {
fn default() -> Self {
Self {
rotate_size_bytes: 0,
fsync_every: 1,
}
}
}
pub struct FileJsonlSink {
writer: BufWriter<tokio::fs::File>,
path: PathBuf,
rotate_size_bytes: u64,
fsync_every: u32,
flush_count: u32,
rotation_count: u64,
bytes_written: u64,
events_sent: u64,
pending_lines: Vec<Vec<u8>>,
pending_bytes: usize,
closed: bool,
}
impl std::fmt::Debug for FileJsonlSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileJsonlSink")
.field("path", &self.path)
.field("closed", &self.closed)
.field("events_sent", &self.events_sent)
.field("pending_lines", &self.pending_lines.len())
.field("pending_bytes", &self.pending_bytes)
.field("bytes_written", &self.bytes_written)
.field("rotation_count", &self.rotation_count)
.finish()
}
}
impl FileJsonlSink {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Self::open_with(path, FileJsonlSinkConfig::default())
}
pub fn open_with(path: impl AsRef<Path>, config: FileJsonlSinkConfig) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|e| {
Error::StateError(format!(
"FileJsonlSink: could not create parent directory '{}': {e}",
parent.display()
))
})?;
}
}
let std_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|e| {
Error::StateError(format!(
"FileJsonlSink: could not open '{}': {e}",
path.display()
))
})?;
let bytes_written = std_file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(Self {
writer: BufWriter::with_capacity(
WRITE_BUF_CAPACITY,
tokio::fs::File::from_std(std_file),
),
path,
rotate_size_bytes: config.rotate_size_bytes,
fsync_every: config.fsync_every.max(1),
flush_count: 0,
rotation_count: 0,
bytes_written,
events_sent: 0,
pending_lines: Vec::new(),
pending_bytes: 0,
closed: false,
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn events_sent(&self) -> u64 {
self.events_sent
}
pub fn queue_depth(&self) -> usize {
self.pending_lines.len()
}
fn closed_err(&self) -> Error {
Error::StateError(format!(
"FileJsonlSink('{}') is closed",
self.path.display()
))
}
async fn write_line(&mut self, line: &[u8]) -> Result<()> {
if self.rotate_size_bytes > 0
&& self.bytes_written > 0
&& self.bytes_written + line.len() as u64 > self.rotate_size_bytes
{
self.rotate().await?;
}
self.writer.write_all(line).await.map_err(Error::IoError)?;
self.bytes_written += line.len() as u64;
Ok(())
}
async fn flush_writer(&mut self) -> Result<()> {
self.writer.flush().await.map_err(Error::IoError)?;
self.flush_count = self.flush_count.wrapping_add(1);
if self.flush_count.is_multiple_of(self.fsync_every) {
self.writer
.get_ref()
.sync_data()
.await
.map_err(Error::IoError)?;
}
Ok(())
}
async fn flush_pending(&mut self) -> Result<()> {
if self.pending_lines.is_empty() {
return self.flush_writer().await;
}
let batch: Vec<Vec<u8>> = self.pending_lines.drain(..).collect();
self.pending_bytes = 0;
for line in batch {
self.write_line(&line).await?;
}
self.flush_writer().await
}
async fn rotate(&mut self) -> Result<()> {
self.writer.flush().await.map_err(Error::IoError)?;
self.writer
.get_ref()
.sync_data()
.await
.map_err(Error::IoError)?;
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0);
self.rotation_count = self.rotation_count.saturating_add(1);
let rotated = rotated_path(&self.path, ts, self.rotation_count);
tokio::fs::rename(&self.path, &rotated)
.await
.map_err(Error::IoError)?;
let new_file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.await
.map_err(Error::IoError)?;
self.writer = BufWriter::with_capacity(WRITE_BUF_CAPACITY, new_file);
self.bytes_written = 0;
Ok(())
}
}
fn rotated_path(path: &Path, timestamp_ms: u128, rotation_count: u64) -> PathBuf {
let stem = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("events");
let parent = path.parent().unwrap_or_else(|| Path::new(""));
let file_name = match path.extension().and_then(|ext| ext.to_str()) {
Some(ext) => format!("{stem}.{timestamp_ms}.{rotation_count}.{ext}"),
None => format!("{stem}.{timestamp_ms}.{rotation_count}"),
};
parent.join(file_name)
}
impl SinkAdapter for FileJsonlSink {
fn name(&self) -> &str {
"file-jsonl"
}
fn queue_depth(&self) -> Option<usize> {
Some(self.pending_lines.len())
}
async fn send(&mut self, event: &Event) -> Result<()> {
if self.closed {
return Err(self.closed_err());
}
let mut line = serde_json::to_vec(event)?;
line.push(b'\n');
self.pending_bytes += line.len();
self.pending_lines.push(line);
self.events_sent += 1;
if self.pending_lines.len() >= BATCH_MAX_LINES || self.pending_bytes >= BATCH_MAX_BYTES {
self.flush_pending().await?;
}
Ok(())
}
async fn flush(&mut self) -> Result<()> {
if self.closed {
return Err(self.closed_err());
}
self.flush_pending().await
}
async fn close(&mut self) -> Result<()> {
if self.closed {
return Ok(());
}
self.flush_pending().await?;
self.writer
.get_ref()
.sync_data()
.await
.map_err(Error::IoError)?;
self.closed = true;
Ok(())
}
fn is_closed(&self) -> bool {
self.closed
}
fn delivery_metrics(&self) -> Option<crate::sink::SinkDeliveryMetrics> {
Some(crate::sink::SinkDeliveryMetrics {
events_sent: self.events_sent,
..Default::default()
})
}
}
#[cfg(test)]
mod tests {
use std::io::BufRead;
use tempfile::NamedTempFile;
use crate::core::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
use crate::sink::SinkAdapter;
use super::{FileJsonlSink, FileJsonlSinkConfig};
fn make_event(table: &str) -> Event {
Event {
before: None,
after: Some(serde_json::json!({"id": 1})),
op: Operation::Insert,
source: SourceMetadata {
source_name: "test".into(),
offset: "0".into(),
timestamp: 1,
},
ts: 1,
schema: None,
table: table.into(),
primary_key: None,
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
fn count_lines(path: &std::path::Path) -> usize {
let f = std::fs::File::open(path).unwrap();
std::io::BufReader::new(f).lines().count()
}
#[tokio::test]
async fn writes_events_as_json_lines() {
let tmp = NamedTempFile::new().unwrap();
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
sink.send(&make_event("orders")).await.unwrap();
sink.send(&make_event("products")).await.unwrap();
sink.flush().await.unwrap();
sink.close().await.unwrap();
assert_eq!(count_lines(tmp.path()), 2);
}
#[tokio::test]
async fn appends_across_opens() {
let tmp = NamedTempFile::new().unwrap();
{
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
sink.send(&make_event("orders")).await.unwrap();
sink.close().await.unwrap();
}
{
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
sink.send(&make_event("products")).await.unwrap();
sink.close().await.unwrap();
}
assert_eq!(count_lines(tmp.path()), 2);
}
#[tokio::test]
async fn send_after_close_errors() {
let tmp = NamedTempFile::new().unwrap();
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
sink.close().await.unwrap();
assert!(sink.send(&make_event("orders")).await.is_err());
}
#[tokio::test]
async fn flush_after_close_errors() {
let tmp = NamedTempFile::new().unwrap();
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
sink.close().await.unwrap();
assert!(sink.flush().await.is_err());
}
#[tokio::test]
async fn events_sent_increments() {
let tmp = NamedTempFile::new().unwrap();
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
assert_eq!(sink.events_sent(), 0);
sink.send(&make_event("t1")).await.unwrap();
sink.send(&make_event("t2")).await.unwrap();
assert_eq!(sink.events_sent(), 2);
sink.close().await.unwrap();
}
#[test]
fn name_is_file_jsonl() {
let tmp = NamedTempFile::new().unwrap();
let sink = FileJsonlSink::open(tmp.path()).unwrap();
assert_eq!(sink.name(), "file-jsonl");
}
#[test]
fn path_returns_configured_path() {
let tmp = NamedTempFile::new().unwrap();
let sink = FileJsonlSink::open(tmp.path()).unwrap();
assert_eq!(sink.path(), tmp.path());
}
#[tokio::test]
async fn written_lines_are_valid_json() {
let tmp = NamedTempFile::new().unwrap();
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
sink.send(&make_event("orders")).await.unwrap();
sink.close().await.unwrap();
let content = std::fs::read_to_string(tmp.path()).unwrap();
for line in content.lines() {
let _: serde_json::Value =
serde_json::from_str(line).expect("each line must be valid JSON");
}
}
#[test]
fn queue_depth_reflects_pending_sends() {
let tmp = NamedTempFile::new().unwrap();
let sink = FileJsonlSink::open(tmp.path()).unwrap();
assert_eq!(sink.queue_depth(), 0);
}
#[tokio::test]
async fn rotation_names_stay_unique_under_rapid_rotations() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.jsonl");
let config = FileJsonlSinkConfig {
rotate_size_bytes: 1,
fsync_every: 1,
};
let mut sink = FileJsonlSink::open_with(&path, config).unwrap();
sink.send(&make_event("t1")).await.unwrap();
sink.flush().await.unwrap();
sink.send(&make_event("t2")).await.unwrap();
sink.flush().await.unwrap();
sink.close().await.unwrap();
let rotated_files: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name().to_string_lossy().to_string())
.filter(|name| name.starts_with("events.") && name.ends_with(".jsonl"))
.collect();
let unique_count = rotated_files
.iter()
.collect::<std::collections::HashSet<_>>()
.len();
assert_eq!(
unique_count,
rotated_files.len(),
"rotated filenames must be unique: {rotated_files:?}"
);
assert!(
!rotated_files.is_empty(),
"expected at least one rotated file"
);
}
#[tokio::test]
async fn double_close_is_idempotent() {
let tmp = NamedTempFile::new().unwrap();
let mut sink = FileJsonlSink::open(tmp.path()).unwrap();
sink.close().await.unwrap();
sink.close().await.unwrap();
}
#[test]
fn debug_impl_does_not_panic() {
let tmp = NamedTempFile::new().unwrap();
let sink = FileJsonlSink::open(tmp.path()).unwrap();
let _ = format!("{sink:?}");
}
}