use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use file_rotate::suffix::AppendTimestamp;
use file_rotate::suffix::FileLimit;
use file_rotate::{ContentLimit, FileRotate, compression::Compression};
use parking_lot::Mutex;
use tracing::debug;
use super::config::{FileWriterConfig, RotationPeriod};
pub struct NdjsonWriter {
writer: Mutex<FileRotate<AppendTimestamp>>,
label: String,
output_path: PathBuf,
lines_written: AtomicU64,
write_errors: AtomicU64,
}
impl std::fmt::Debug for NdjsonWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NdjsonWriter")
.field("label", &self.label)
.field("output_path", &self.output_path)
.field("lines_written", &self.lines_written.load(Ordering::Relaxed))
.field("write_errors", &self.write_errors.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl NdjsonWriter {
pub fn new(
config: &FileWriterConfig,
subdir: &str,
filename: &str,
label: &str,
) -> Result<Self, std::io::Error> {
let dir = config.path.join(subdir);
std::fs::create_dir_all(&dir)?;
let file_path = dir.join(filename);
let content_limit = match config.rotation {
RotationPeriod::Hourly => ContentLimit::Time(file_rotate::TimeFrequency::Hourly),
RotationPeriod::Daily => ContentLimit::Time(file_rotate::TimeFrequency::Daily),
};
let max_age = chrono::Duration::days(i64::from(config.max_age_days));
let suffix_scheme = AppendTimestamp::default(FileLimit::Age(max_age));
let compression = if config.compress_rotated {
Compression::OnRotate(6)
} else {
Compression::None
};
let writer = FileRotate::new(file_path, suffix_scheme, content_limit, compression, None);
debug!(
label = label,
path = %dir.display(),
rotation = ?config.rotation,
"{} writer initialised",
label,
);
Ok(Self {
writer: Mutex::new(writer),
label: label.to_string(),
output_path: dir,
lines_written: AtomicU64::new(0),
write_errors: AtomicU64::new(0),
})
}
pub fn write_line(&self, line: &[u8]) -> Result<(), std::io::Error> {
let mut writer = self.writer.lock();
if let Err(e) = writer.write_all(line).and_then(|()| writer.flush()) {
self.write_errors.fetch_add(1, Ordering::Relaxed);
return Err(e);
}
self.lines_written.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn write_buf(&self, buf: &[u8], count: u64) -> Result<(), std::io::Error> {
let mut writer = self.writer.lock();
if let Err(e) = writer.write_all(buf).and_then(|()| writer.flush()) {
self.write_errors.fetch_add(1, Ordering::Relaxed);
return Err(e);
}
self.lines_written.fetch_add(count, Ordering::Relaxed);
Ok(())
}
pub fn lines_written(&self) -> u64 {
self.lines_written.load(Ordering::Relaxed)
}
pub fn write_errors(&self) -> u64 {
self.write_errors.load(Ordering::Relaxed)
}
pub fn label(&self) -> &str {
&self.label
}
pub fn output_path(&self) -> &PathBuf {
&self.output_path
}
}
#[derive(Debug, Clone)]
pub struct AsyncNdjsonWriter {
inner: Arc<NdjsonWriter>,
}
impl AsyncNdjsonWriter {
#[must_use]
pub fn new(writer: NdjsonWriter) -> Self {
Self {
inner: Arc::new(writer),
}
}
#[must_use]
pub fn from_arc(writer: Arc<NdjsonWriter>) -> Self {
Self { inner: writer }
}
pub async fn write_line(&self, line: Vec<u8>) -> Result<(), std::io::Error> {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || inner.write_line(&line))
.await
.map_err(std::io::Error::other)?
}
pub async fn write_buf(&self, buf: Vec<u8>, count: u64) -> Result<(), std::io::Error> {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || inner.write_buf(&buf, count))
.await
.map_err(std::io::Error::other)?
}
#[must_use]
pub fn lines_written(&self) -> u64 {
self.inner.lines_written()
}
#[must_use]
pub fn write_errors(&self) -> u64 {
self.inner.write_errors()
}
#[must_use]
pub fn label(&self) -> &str {
self.inner.label()
}
#[must_use]
pub fn output_path(&self) -> &Path {
self.inner.output_path().as_path()
}
#[must_use]
pub fn shared(&self) -> Arc<NdjsonWriter> {
Arc::clone(&self.inner)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config(dir: &std::path::Path) -> FileWriterConfig {
FileWriterConfig {
path: dir.to_path_buf(),
rotation: RotationPeriod::Daily,
max_age_days: 1,
compress_rotated: false,
}
}
#[test]
fn test_write_single_line() {
let dir = tempfile::tempdir().expect("tempdir");
let config = test_config(dir.path());
let writer = NdjsonWriter::new(&config, "test-svc", "out.ndjson", "test").expect("create");
assert_eq!(writer.lines_written(), 0);
assert_eq!(writer.write_errors(), 0);
writer.write_line(b"{\"msg\":\"hello\"}\n").expect("write");
assert_eq!(writer.lines_written(), 1);
let content =
std::fs::read_to_string(dir.path().join("test-svc/out.ndjson")).expect("read");
assert_eq!(content.trim(), r#"{"msg":"hello"}"#);
}
#[test]
fn test_write_multiple_lines() {
let dir = tempfile::tempdir().expect("tempdir");
let config = test_config(dir.path());
let writer =
NdjsonWriter::new(&config, "multi", "events.ndjson", "output").expect("create");
for i in 0..3 {
let line = format!("{{\"n\":{i}}}\n");
writer.write_line(line.as_bytes()).expect("write");
}
assert_eq!(writer.lines_written(), 3);
let content =
std::fs::read_to_string(dir.path().join("multi/events.ndjson")).expect("read");
let lines: Vec<&str> = content.trim().lines().collect();
assert_eq!(lines.len(), 3);
}
#[test]
fn test_write_buf_batch() {
let dir = tempfile::tempdir().expect("tempdir");
let config = test_config(dir.path());
let writer = NdjsonWriter::new(&config, "batch", "out.ndjson", "test").expect("create");
let mut buf = Vec::new();
for i in 0..5 {
buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
}
writer.write_buf(&buf, 5).expect("write batch");
assert_eq!(writer.lines_written(), 5);
let content = std::fs::read_to_string(dir.path().join("batch/out.ndjson")).expect("read");
let lines: Vec<&str> = content.trim().lines().collect();
assert_eq!(lines.len(), 5);
}
#[test]
fn test_debug_format() {
let dir = tempfile::tempdir().expect("tempdir");
let config = test_config(dir.path());
let writer = NdjsonWriter::new(&config, "dbg", "out.ndjson", "dlq").expect("create");
let debug = format!("{writer:?}");
assert!(debug.contains("NdjsonWriter"));
assert!(debug.contains("dlq"));
}
#[test]
fn test_label_and_path() {
let dir = tempfile::tempdir().expect("tempdir");
let config = test_config(dir.path());
let writer = NdjsonWriter::new(&config, "svc", "data.ndjson", "output").expect("create");
assert_eq!(writer.label(), "output");
assert_eq!(writer.output_path(), &dir.path().join("svc"));
}
#[tokio::test]
async fn async_write_line_writes_to_file() {
let dir = tempfile::tempdir().expect("tempdir");
let cfg = test_config(dir.path());
let writer = NdjsonWriter::new(&cfg, "async-svc", "out.ndjson", "test").expect("create");
let async_w = AsyncNdjsonWriter::new(writer);
async_w
.write_line(b"{\"k\":\"v\"}\n".to_vec())
.await
.expect("write_line");
assert_eq!(async_w.lines_written(), 1);
assert_eq!(async_w.write_errors(), 0);
assert_eq!(async_w.label(), "test");
assert_eq!(async_w.output_path(), dir.path().join("async-svc"));
let body = std::fs::read_to_string(dir.path().join("async-svc/out.ndjson")).expect("read");
assert_eq!(body.trim(), r#"{"k":"v"}"#);
}
#[tokio::test]
async fn async_writer_from_arc_shares_state() {
let dir = tempfile::tempdir().expect("tempdir");
let cfg = test_config(dir.path());
let writer = NdjsonWriter::new(&cfg, "share", "out.ndjson", "test").expect("create");
let shared = Arc::new(writer);
let a = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
let b = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
a.write_line(b"{\"a\":1}\n".to_vec()).await.expect("a");
b.write_line(b"{\"b\":2}\n".to_vec()).await.expect("b");
assert_eq!(a.lines_written(), 2);
assert_eq!(b.lines_written(), 2);
assert!(Arc::ptr_eq(&a.shared(), &b.shared()));
}
#[tokio::test]
async fn async_write_buf_writes_batch() {
let dir = tempfile::tempdir().expect("tempdir");
let cfg = test_config(dir.path());
let writer = NdjsonWriter::new(&cfg, "batch", "out.ndjson", "test").expect("create");
let async_w = AsyncNdjsonWriter::new(writer);
let mut buf = Vec::new();
for i in 0..5 {
buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
}
async_w.write_buf(buf, 5).await.expect("write_buf");
assert_eq!(async_w.lines_written(), 5);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn async_writer_does_not_block_runtime() {
let dir = tempfile::tempdir().expect("tempdir");
let cfg = test_config(dir.path());
let writer = NdjsonWriter::new(&cfg, "concurrent", "out.ndjson", "test").expect("create");
let async_w = AsyncNdjsonWriter::new(writer);
let ticker_fired = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let tf = ticker_fired.clone();
let ticker = tokio::spawn(async move {
let mut t = tokio::time::interval(std::time::Duration::from_millis(2));
t.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
t.tick().await; for _ in 0..20 {
t.tick().await;
tf.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
});
let mut writers = Vec::new();
for _ in 0..4 {
let w = async_w.clone();
writers.push(tokio::spawn(async move {
for i in 0..50_u32 {
w.write_line(format!("{{\"n\":{i}}}\n").into_bytes())
.await
.expect("write");
}
}));
}
for h in writers {
h.await.expect("writer task");
}
ticker.await.expect("ticker task");
assert_eq!(async_w.lines_written(), 200);
let ticks = ticker_fired.load(std::sync::atomic::Ordering::SeqCst);
assert!(
ticks >= 10,
"ticker fired only {ticks} times — writers starved the runtime",
);
}
}