Skip to main content

hyperi_rustlib/io/
ndjson_writer.rs

1// Project:   hyperi-rustlib
2// File:      src/io/ndjson_writer.rs
3// Purpose:   Core NDJSON file writer with rotation and metrics
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Core NDJSON file writer with automatic rotation.
10//!
11//! Writes `&[u8]` lines to a rotating file using the `file-rotate` crate.
12//! This writer knows nothing about DLQ or output semantics -- callers
13//! serialise their own types and hand raw bytes to the writer.
14//!
15//! ## Two APIs
16//!
17//! - [`NdjsonWriter`] -- synchronous. Acquires a `parking_lot::Mutex` and
18//!   calls `std::io::Write::write_all` directly. Cheap (~µs) but blocks
19//!   the calling thread. Safe to call from non-async code and tests.
20//! - [`AsyncNdjsonWriter`] -- async wrapper over `Arc<NdjsonWriter>`. Each
21//!   call runs the sync work on a `tokio::task::spawn_blocking` thread,
22//!   so the tokio runtime is never stalled. Use this from `async fn`
23//!   bodies.
24//!
25//! ## Thread Safety
26//!
27//! Both wrappers are `Send + Sync`. `NdjsonWriter` uses
28//! `parking_lot::Mutex<FileRotate>` internally so multiple callers can
29//! share one writer instance.
30
31use std::io::Write;
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35
36use file_rotate::suffix::AppendTimestamp;
37use file_rotate::suffix::FileLimit;
38use file_rotate::{ContentLimit, FileRotate, compression::Compression};
39use parking_lot::Mutex;
40use tracing::debug;
41
42use super::config::{FileWriterConfig, RotationPeriod};
43
44/// NDJSON file writer with automatic rotation and metrics.
45///
46/// Each line written is expected to be a complete JSON object (NDJSON format).
47/// The writer handles file rotation, optional compression, and age-based cleanup.
48pub struct NdjsonWriter {
49    writer: Mutex<FileRotate<AppendTimestamp>>,
50    label: String,
51    output_path: PathBuf,
52    lines_written: AtomicU64,
53    write_errors: AtomicU64,
54}
55
56impl std::fmt::Debug for NdjsonWriter {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("NdjsonWriter")
59            .field("label", &self.label)
60            .field("output_path", &self.output_path)
61            .field("lines_written", &self.lines_written.load(Ordering::Relaxed))
62            .field("write_errors", &self.write_errors.load(Ordering::Relaxed))
63            .finish_non_exhaustive()
64    }
65}
66
67impl NdjsonWriter {
68    /// Create a new NDJSON writer.
69    ///
70    /// Creates `{config.path}/{subdir}/` and writes to `{filename}` within it.
71    /// Files are rotated according to the config's rotation period.
72    ///
73    /// # Arguments
74    ///
75    /// * `config` -- Shared file writer settings (path, rotation, compression)
76    /// * `subdir` -- Subdirectory under `config.path` (e.g. service name)
77    /// * `filename` -- Output filename (e.g. "dlq.ndjson", "events.ndjson")
78    /// * `label` -- Human label for log messages (e.g. "dlq", "output")
79    ///
80    /// # Errors
81    ///
82    /// Returns `std::io::Error` if the output directory cannot be created.
83    pub fn new(
84        config: &FileWriterConfig,
85        subdir: &str,
86        filename: &str,
87        label: &str,
88    ) -> Result<Self, std::io::Error> {
89        let dir = config.path.join(subdir);
90        std::fs::create_dir_all(&dir)?;
91
92        let file_path = dir.join(filename);
93
94        let content_limit = match config.rotation {
95            RotationPeriod::Hourly => ContentLimit::Time(file_rotate::TimeFrequency::Hourly),
96            RotationPeriod::Daily => ContentLimit::Time(file_rotate::TimeFrequency::Daily),
97        };
98
99        let max_age = chrono::Duration::days(i64::from(config.max_age_days));
100        let suffix_scheme = AppendTimestamp::default(FileLimit::Age(max_age));
101
102        let compression = if config.compress_rotated {
103            Compression::OnRotate(6)
104        } else {
105            Compression::None
106        };
107
108        let writer = FileRotate::new(file_path, suffix_scheme, content_limit, compression, None);
109
110        debug!(
111            label = label,
112            path = %dir.display(),
113            rotation = ?config.rotation,
114            "{} writer initialised",
115            label,
116        );
117
118        Ok(Self {
119            writer: Mutex::new(writer),
120            label: label.to_string(),
121            output_path: dir,
122            lines_written: AtomicU64::new(0),
123            write_errors: AtomicU64::new(0),
124        })
125    }
126
127    /// Write a single line (must include trailing newline or caller appends it).
128    ///
129    /// The data is written as-is -- caller is responsible for serialisation
130    /// and newline termination.
131    pub fn write_line(&self, line: &[u8]) -> Result<(), std::io::Error> {
132        let mut writer = self.writer.lock();
133        if let Err(e) = writer.write_all(line).and_then(|()| writer.flush()) {
134            self.write_errors.fetch_add(1, Ordering::Relaxed);
135            return Err(e);
136        }
137        self.lines_written.fetch_add(1, Ordering::Relaxed);
138        Ok(())
139    }
140
141    /// Write a pre-serialised buffer containing multiple newline-delimited lines.
142    ///
143    /// The buffer should already have newlines between entries. The `count`
144    /// parameter is used for metrics tracking.
145    pub fn write_buf(&self, buf: &[u8], count: u64) -> Result<(), std::io::Error> {
146        let mut writer = self.writer.lock();
147        if let Err(e) = writer.write_all(buf).and_then(|()| writer.flush()) {
148            self.write_errors.fetch_add(1, Ordering::Relaxed);
149            return Err(e);
150        }
151        self.lines_written.fetch_add(count, Ordering::Relaxed);
152        Ok(())
153    }
154
155    /// Flush any in-memory buffer through the rotating writer.
156    ///
157    /// Note: `file-rotate` does not expose the underlying `File` handle,
158    /// so this can only call `flush()` (which pushes buffered bytes from
159    /// the writer's in-memory buffer to the kernel page cache). It does
160    /// NOT guarantee on-disk durability -- a power-loss event between the
161    /// page cache and the platter can still lose data. This is a
162    /// limitation of the rotation library, not the API: when
163    /// `file-rotate` gains a `sync` hook, this method will gain
164    /// `fsync`-level semantics.
165    ///
166    /// For DLQ flush callers: this is the strongest durability the file
167    /// backend can express today. Producers running with high write
168    /// rates plus an `acks=all` sibling Kafka backend will get real
169    /// durability from the Kafka path.
170    ///
171    /// # Errors
172    ///
173    /// Returns the underlying `std::io::Error` if the flush fails. The
174    /// internal `write_errors` counter is incremented.
175    pub fn flush(&self) -> Result<(), std::io::Error> {
176        let mut writer = self.writer.lock();
177        if let Err(e) = writer.flush() {
178            self.write_errors.fetch_add(1, Ordering::Relaxed);
179            return Err(e);
180        }
181        Ok(())
182    }
183
184    /// Number of lines successfully written.
185    pub fn lines_written(&self) -> u64 {
186        self.lines_written.load(Ordering::Relaxed)
187    }
188
189    /// Number of write errors encountered.
190    pub fn write_errors(&self) -> u64 {
191        self.write_errors.load(Ordering::Relaxed)
192    }
193
194    /// Human label for this writer (e.g. "dlq", "output").
195    pub fn label(&self) -> &str {
196        &self.label
197    }
198
199    /// Output directory path.
200    pub fn output_path(&self) -> &PathBuf {
201        &self.output_path
202    }
203}
204
205/// Async wrapper around [`NdjsonWriter`] that runs the sync rotate-and-write
206/// on `tokio::task::spawn_blocking` to keep the tokio runtime unblocked.
207///
208/// Use this from `async fn` bodies. For sync code paths, call
209/// [`NdjsonWriter`] directly.
210///
211/// Holds an `Arc<NdjsonWriter>` so multiple async tasks can share one
212/// writer without cloning the underlying `parking_lot::Mutex<FileRotate>`.
213#[derive(Debug, Clone)]
214pub struct AsyncNdjsonWriter {
215    inner: Arc<NdjsonWriter>,
216}
217
218impl AsyncNdjsonWriter {
219    /// Wrap an `NdjsonWriter`. Use this when no other task needs the
220    /// underlying writer.
221    #[must_use]
222    pub fn new(writer: NdjsonWriter) -> Self {
223        Self {
224            inner: Arc::new(writer),
225        }
226    }
227
228    /// Wrap a shared `Arc<NdjsonWriter>`. Use this when sync code paths
229    /// also need access to the same writer.
230    #[must_use]
231    pub fn from_arc(writer: Arc<NdjsonWriter>) -> Self {
232        Self { inner: writer }
233    }
234
235    /// Write a single line off-runtime. The closure runs on a blocking
236    /// thread; the tokio runtime is free to schedule other tasks.
237    ///
238    /// # Errors
239    ///
240    /// Returns the underlying `std::io::Error` from the sync writer, or
241    /// an `io::Error::other(JoinError)` if the blocking thread panicked.
242    pub async fn write_line(&self, line: Vec<u8>) -> Result<(), std::io::Error> {
243        let inner = Arc::clone(&self.inner);
244        tokio::task::spawn_blocking(move || inner.write_line(&line))
245            .await
246            .map_err(std::io::Error::other)?
247    }
248
249    /// Write a pre-coalesced buffer of `count` lines off-runtime.
250    ///
251    /// # Errors
252    ///
253    /// As [`Self::write_line`].
254    pub async fn write_buf(&self, buf: Vec<u8>, count: u64) -> Result<(), std::io::Error> {
255        let inner = Arc::clone(&self.inner);
256        tokio::task::spawn_blocking(move || inner.write_buf(&buf, count))
257            .await
258            .map_err(std::io::Error::other)?
259    }
260
261    /// Flush buffered bytes through the rotating writer off-runtime.
262    ///
263    /// See [`NdjsonWriter::flush`] for durability semantics -- currently
264    /// flushes to kernel page cache only (the `file-rotate` crate
265    /// doesn't expose the inner `File` for `fsync`).
266    ///
267    /// # Errors
268    ///
269    /// As [`Self::write_line`].
270    pub async fn flush(&self) -> Result<(), std::io::Error> {
271        let inner = Arc::clone(&self.inner);
272        tokio::task::spawn_blocking(move || inner.flush())
273            .await
274            .map_err(std::io::Error::other)?
275    }
276
277    /// Number of lines successfully written.
278    #[must_use]
279    pub fn lines_written(&self) -> u64 {
280        self.inner.lines_written()
281    }
282
283    /// Number of write errors.
284    #[must_use]
285    pub fn write_errors(&self) -> u64 {
286        self.inner.write_errors()
287    }
288
289    /// Human label.
290    #[must_use]
291    pub fn label(&self) -> &str {
292        self.inner.label()
293    }
294
295    /// Output directory path.
296    #[must_use]
297    pub fn output_path(&self) -> &Path {
298        self.inner.output_path().as_path()
299    }
300
301    /// Shared `Arc<NdjsonWriter>` for code paths that need sync access.
302    #[must_use]
303    pub fn shared(&self) -> Arc<NdjsonWriter> {
304        Arc::clone(&self.inner)
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    fn test_config(dir: &std::path::Path) -> FileWriterConfig {
313        FileWriterConfig {
314            path: dir.to_path_buf(),
315            rotation: RotationPeriod::Daily,
316            max_age_days: 1,
317            compress_rotated: false,
318        }
319    }
320
321    #[test]
322    fn test_write_single_line() {
323        let dir = tempfile::tempdir().expect("tempdir");
324        let config = test_config(dir.path());
325
326        let writer = NdjsonWriter::new(&config, "test-svc", "out.ndjson", "test").expect("create");
327        assert_eq!(writer.lines_written(), 0);
328        assert_eq!(writer.write_errors(), 0);
329
330        writer.write_line(b"{\"msg\":\"hello\"}\n").expect("write");
331        assert_eq!(writer.lines_written(), 1);
332
333        let content =
334            std::fs::read_to_string(dir.path().join("test-svc/out.ndjson")).expect("read");
335        assert_eq!(content.trim(), r#"{"msg":"hello"}"#);
336    }
337
338    #[test]
339    fn test_write_multiple_lines() {
340        let dir = tempfile::tempdir().expect("tempdir");
341        let config = test_config(dir.path());
342
343        let writer =
344            NdjsonWriter::new(&config, "multi", "events.ndjson", "output").expect("create");
345
346        for i in 0..3 {
347            let line = format!("{{\"n\":{i}}}\n");
348            writer.write_line(line.as_bytes()).expect("write");
349        }
350        assert_eq!(writer.lines_written(), 3);
351
352        let content =
353            std::fs::read_to_string(dir.path().join("multi/events.ndjson")).expect("read");
354        let lines: Vec<&str> = content.trim().lines().collect();
355        assert_eq!(lines.len(), 3);
356    }
357
358    #[test]
359    fn test_write_buf_batch() {
360        let dir = tempfile::tempdir().expect("tempdir");
361        let config = test_config(dir.path());
362
363        let writer = NdjsonWriter::new(&config, "batch", "out.ndjson", "test").expect("create");
364
365        let mut buf = Vec::new();
366        for i in 0..5 {
367            buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
368        }
369        writer.write_buf(&buf, 5).expect("write batch");
370        assert_eq!(writer.lines_written(), 5);
371
372        let content = std::fs::read_to_string(dir.path().join("batch/out.ndjson")).expect("read");
373        let lines: Vec<&str> = content.trim().lines().collect();
374        assert_eq!(lines.len(), 5);
375    }
376
377    #[test]
378    fn test_debug_format() {
379        let dir = tempfile::tempdir().expect("tempdir");
380        let config = test_config(dir.path());
381
382        let writer = NdjsonWriter::new(&config, "dbg", "out.ndjson", "dlq").expect("create");
383        let debug = format!("{writer:?}");
384        assert!(debug.contains("NdjsonWriter"));
385        assert!(debug.contains("dlq"));
386    }
387
388    #[test]
389    fn test_label_and_path() {
390        let dir = tempfile::tempdir().expect("tempdir");
391        let config = test_config(dir.path());
392
393        let writer = NdjsonWriter::new(&config, "svc", "data.ndjson", "output").expect("create");
394        assert_eq!(writer.label(), "output");
395        assert_eq!(writer.output_path(), &dir.path().join("svc"));
396    }
397
398    // -----------------------------------------------------------------
399    // AsyncNdjsonWriter tests -- these prove the async wrapper actually
400    // moves the sync work off the runtime thread.
401    // -----------------------------------------------------------------
402
403    #[tokio::test]
404    async fn async_write_line_writes_to_file() {
405        let dir = tempfile::tempdir().expect("tempdir");
406        let cfg = test_config(dir.path());
407        let writer = NdjsonWriter::new(&cfg, "async-svc", "out.ndjson", "test").expect("create");
408        let async_w = AsyncNdjsonWriter::new(writer);
409
410        async_w
411            .write_line(b"{\"k\":\"v\"}\n".to_vec())
412            .await
413            .expect("write_line");
414        assert_eq!(async_w.lines_written(), 1);
415        assert_eq!(async_w.write_errors(), 0);
416        assert_eq!(async_w.label(), "test");
417        assert_eq!(async_w.output_path(), dir.path().join("async-svc"));
418
419        let body = std::fs::read_to_string(dir.path().join("async-svc/out.ndjson")).expect("read");
420        assert_eq!(body.trim(), r#"{"k":"v"}"#);
421    }
422
423    #[tokio::test]
424    async fn async_writer_from_arc_shares_state() {
425        let dir = tempfile::tempdir().expect("tempdir");
426        let cfg = test_config(dir.path());
427        let writer = NdjsonWriter::new(&cfg, "share", "out.ndjson", "test").expect("create");
428        let shared = Arc::new(writer);
429        let a = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
430        let b = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
431
432        a.write_line(b"{\"a\":1}\n".to_vec()).await.expect("a");
433        b.write_line(b"{\"b\":2}\n".to_vec()).await.expect("b");
434
435        // Both views see the shared counter.
436        assert_eq!(a.lines_written(), 2);
437        assert_eq!(b.lines_written(), 2);
438        assert!(Arc::ptr_eq(&a.shared(), &b.shared()));
439    }
440
441    #[tokio::test]
442    async fn async_write_buf_writes_batch() {
443        let dir = tempfile::tempdir().expect("tempdir");
444        let cfg = test_config(dir.path());
445        let writer = NdjsonWriter::new(&cfg, "batch", "out.ndjson", "test").expect("create");
446        let async_w = AsyncNdjsonWriter::new(writer);
447
448        let mut buf = Vec::new();
449        for i in 0..5 {
450            buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
451        }
452        async_w.write_buf(buf, 5).await.expect("write_buf");
453        assert_eq!(async_w.lines_written(), 5);
454    }
455
456    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
457    async fn async_writer_does_not_block_runtime() {
458        // Prove that concurrent writers + ticker on the same runtime
459        // make progress concurrently -- i.e. write_line releases the
460        // runtime thread.
461        let dir = tempfile::tempdir().expect("tempdir");
462        let cfg = test_config(dir.path());
463        let writer = NdjsonWriter::new(&cfg, "concurrent", "out.ndjson", "test").expect("create");
464        let async_w = AsyncNdjsonWriter::new(writer);
465
466        let ticker_fired = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
467        let tf = ticker_fired.clone();
468        let ticker = tokio::spawn(async move {
469            let mut t = tokio::time::interval(std::time::Duration::from_millis(2));
470            t.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
471            t.tick().await; // burn the t=0 tick
472            for _ in 0..20 {
473                t.tick().await;
474                tf.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
475            }
476        });
477
478        let mut writers = Vec::new();
479        for _ in 0..4 {
480            let w = async_w.clone();
481            writers.push(tokio::spawn(async move {
482                for i in 0..50_u32 {
483                    w.write_line(format!("{{\"n\":{i}}}\n").into_bytes())
484                        .await
485                        .expect("write");
486                }
487            }));
488        }
489        for h in writers {
490            h.await.expect("writer task");
491        }
492        ticker.await.expect("ticker task");
493
494        assert_eq!(async_w.lines_written(), 200);
495        let ticks = ticker_fired.load(std::sync::atomic::Ordering::SeqCst);
496        assert!(
497            ticks >= 10,
498            "ticker fired only {ticks} times -- writers starved the runtime",
499        );
500    }
501}