Skip to main content

hyperi_rustlib/io/
ndjson_writer.rs

1// Project:   hyperi-rustlib
2// File:      src/io/ndjson_writer.rs
3// Purpose:   Core NDJSON file writer with rotation and metrics
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Core NDJSON file writer with automatic rotation.
10//!
11//! Writes `&[u8]` lines to a rotating file using the `file-rotate` crate.
12//! This writer knows nothing about DLQ or output semantics -- callers
13//! serialise their own types and hand raw bytes to the writer.
14//!
15//! ## Two APIs
16//!
17//! - [`NdjsonWriter`] -- synchronous. Acquires a `parking_lot::Mutex` and
18//!   calls `std::io::Write::write_all` directly. Cheap (~µs) but blocks
19//!   the calling thread. Safe to call from non-async code and tests.
20//! - [`AsyncNdjsonWriter`] -- async wrapper over `Arc<NdjsonWriter>`. Each
21//!   call runs the sync work on a `tokio::task::spawn_blocking` thread,
22//!   so the tokio runtime is never stalled. Use this from `async fn`
23//!   bodies.
24//!
25//! ## Thread Safety
26//!
27//! Both wrappers are `Send + Sync`. `NdjsonWriter` uses
28//! `parking_lot::Mutex<FileRotate>` internally so multiple callers can
29//! share one writer instance.
30
31use std::io::Write;
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35
36use file_rotate::suffix::AppendTimestamp;
37use file_rotate::suffix::FileLimit;
38use file_rotate::{ContentLimit, FileRotate, compression::Compression};
39use parking_lot::Mutex;
40use tracing::debug;
41
42use super::config::{FileWriterConfig, RotationPeriod};
43
44/// NDJSON file writer with automatic rotation and metrics.
45///
46/// Each line written is expected to be a complete JSON object (NDJSON format).
47/// The writer handles file rotation, optional compression, and age-based cleanup.
48pub struct NdjsonWriter {
49    writer: Mutex<FileRotate<AppendTimestamp>>,
50    label: String,
51    output_path: PathBuf,
52    lines_written: AtomicU64,
53    write_errors: AtomicU64,
54}
55
56impl std::fmt::Debug for NdjsonWriter {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("NdjsonWriter")
59            .field("label", &self.label)
60            .field("output_path", &self.output_path)
61            .field("lines_written", &self.lines_written.load(Ordering::Relaxed))
62            .field("write_errors", &self.write_errors.load(Ordering::Relaxed))
63            .finish_non_exhaustive()
64    }
65}
66
67impl NdjsonWriter {
68    /// Create a new NDJSON writer.
69    ///
70    /// Creates `{config.path}/{subdir}/` and writes to `{filename}` within it.
71    /// Files are rotated according to the config's rotation period.
72    ///
73    /// # Arguments
74    ///
75    /// * `config` -- Shared file writer settings (path, rotation, compression)
76    /// * `subdir` -- Subdirectory under `config.path` (e.g. service name)
77    /// * `filename` -- Output filename (e.g. "dlq.ndjson", "events.ndjson")
78    /// * `label` -- Human label for log messages (e.g. "dlq", "output")
79    ///
80    /// # Errors
81    ///
82    /// Returns `std::io::Error` if the output directory cannot be created.
83    pub fn new(
84        config: &FileWriterConfig,
85        subdir: &str,
86        filename: &str,
87        label: &str,
88    ) -> Result<Self, std::io::Error> {
89        let dir = config.path.join(subdir);
90        std::fs::create_dir_all(&dir)?;
91
92        let file_path = dir.join(filename);
93
94        let content_limit = match config.rotation {
95            RotationPeriod::Hourly => ContentLimit::Time(file_rotate::TimeFrequency::Hourly),
96            RotationPeriod::Daily => ContentLimit::Time(file_rotate::TimeFrequency::Daily),
97        };
98
99        let max_age = chrono::Duration::days(i64::from(config.max_age_days));
100        let suffix_scheme = AppendTimestamp::default(FileLimit::Age(max_age));
101
102        let compression = if config.compress_rotated {
103            Compression::OnRotate(6)
104        } else {
105            Compression::None
106        };
107
108        let writer = FileRotate::new(file_path, suffix_scheme, content_limit, compression, None);
109
110        debug!(
111            label = label,
112            path = %dir.display(),
113            rotation = ?config.rotation,
114            "{} writer initialised",
115            label,
116        );
117
118        Ok(Self {
119            writer: Mutex::new(writer),
120            label: label.to_string(),
121            output_path: dir,
122            lines_written: AtomicU64::new(0),
123            write_errors: AtomicU64::new(0),
124        })
125    }
126
127    /// Write a single line (must include trailing newline or caller appends it).
128    ///
129    /// The data is written as-is -- caller is responsible for serialisation
130    /// and newline termination.
131    pub fn write_line(&self, line: &[u8]) -> Result<(), std::io::Error> {
132        let mut writer = self.writer.lock();
133        if let Err(e) = writer.write_all(line).and_then(|()| writer.flush()) {
134            self.write_errors.fetch_add(1, Ordering::Relaxed);
135            return Err(e);
136        }
137        self.lines_written.fetch_add(1, Ordering::Relaxed);
138        Ok(())
139    }
140
141    /// Write a pre-serialised buffer containing multiple newline-delimited lines.
142    ///
143    /// The buffer should already have newlines between entries. The `count`
144    /// parameter is used for metrics tracking.
145    pub fn write_buf(&self, buf: &[u8], count: u64) -> Result<(), std::io::Error> {
146        let mut writer = self.writer.lock();
147        if let Err(e) = writer.write_all(buf).and_then(|()| writer.flush()) {
148            self.write_errors.fetch_add(1, Ordering::Relaxed);
149            return Err(e);
150        }
151        self.lines_written.fetch_add(count, Ordering::Relaxed);
152        Ok(())
153    }
154
155    /// Flush the in-memory buffer through the rotating writer.
156    ///
157    /// `file-rotate` doesn't expose the inner `File`, so this flushes to
158    /// the kernel page cache only -- NOT on-disk durability. Power loss
159    /// before write-back can still lose data. Strongest the file backend
160    /// can express until `file-rotate` gains a sync hook; for real
161    /// durability pair with an `acks=all` Kafka backend.
162    ///
163    /// # Errors
164    ///
165    /// Returns the underlying `std::io::Error` if the flush fails. The
166    /// internal `write_errors` counter is incremented.
167    pub fn flush(&self) -> Result<(), std::io::Error> {
168        let mut writer = self.writer.lock();
169        if let Err(e) = writer.flush() {
170            self.write_errors.fetch_add(1, Ordering::Relaxed);
171            return Err(e);
172        }
173        Ok(())
174    }
175
176    /// Number of lines successfully written.
177    pub fn lines_written(&self) -> u64 {
178        self.lines_written.load(Ordering::Relaxed)
179    }
180
181    /// Number of write errors encountered.
182    pub fn write_errors(&self) -> u64 {
183        self.write_errors.load(Ordering::Relaxed)
184    }
185
186    /// Human label for this writer (e.g. "dlq", "output").
187    pub fn label(&self) -> &str {
188        &self.label
189    }
190
191    /// Output directory path.
192    pub fn output_path(&self) -> &PathBuf {
193        &self.output_path
194    }
195}
196
197/// Async wrapper around [`NdjsonWriter`] that runs the sync rotate-and-write
198/// on `tokio::task::spawn_blocking` to keep the tokio runtime unblocked.
199///
200/// Use this from `async fn` bodies. For sync code paths, call
201/// [`NdjsonWriter`] directly.
202///
203/// Holds an `Arc<NdjsonWriter>` so multiple async tasks can share one
204/// writer without cloning the underlying `parking_lot::Mutex<FileRotate>`.
205#[derive(Debug, Clone)]
206pub struct AsyncNdjsonWriter {
207    inner: Arc<NdjsonWriter>,
208}
209
210impl AsyncNdjsonWriter {
211    /// Wrap an `NdjsonWriter`. Use this when no other task needs the
212    /// underlying writer.
213    #[must_use]
214    pub fn new(writer: NdjsonWriter) -> Self {
215        Self {
216            inner: Arc::new(writer),
217        }
218    }
219
220    /// Wrap a shared `Arc<NdjsonWriter>`. Use this when sync code paths
221    /// also need access to the same writer.
222    #[must_use]
223    pub fn from_arc(writer: Arc<NdjsonWriter>) -> Self {
224        Self { inner: writer }
225    }
226
227    /// Write a single line off-runtime. The closure runs on a blocking
228    /// thread; the tokio runtime is free to schedule other tasks.
229    ///
230    /// # Errors
231    ///
232    /// Returns the underlying `std::io::Error` from the sync writer, or
233    /// an `io::Error::other(JoinError)` if the blocking thread panicked.
234    pub async fn write_line(&self, line: Vec<u8>) -> Result<(), std::io::Error> {
235        let inner = Arc::clone(&self.inner);
236        tokio::task::spawn_blocking(move || inner.write_line(&line))
237            .await
238            .map_err(std::io::Error::other)?
239    }
240
241    /// Write a pre-coalesced buffer of `count` lines off-runtime.
242    ///
243    /// # Errors
244    ///
245    /// As [`Self::write_line`].
246    pub async fn write_buf(&self, buf: Vec<u8>, count: u64) -> Result<(), std::io::Error> {
247        let inner = Arc::clone(&self.inner);
248        tokio::task::spawn_blocking(move || inner.write_buf(&buf, count))
249            .await
250            .map_err(std::io::Error::other)?
251    }
252
253    /// Flush buffered bytes through the rotating writer off-runtime.
254    ///
255    /// See [`NdjsonWriter::flush`] for durability semantics -- currently
256    /// flushes to kernel page cache only (the `file-rotate` crate
257    /// doesn't expose the inner `File` for `fsync`).
258    ///
259    /// # Errors
260    ///
261    /// As [`Self::write_line`].
262    pub async fn flush(&self) -> Result<(), std::io::Error> {
263        let inner = Arc::clone(&self.inner);
264        tokio::task::spawn_blocking(move || inner.flush())
265            .await
266            .map_err(std::io::Error::other)?
267    }
268
269    /// Number of lines successfully written.
270    #[must_use]
271    pub fn lines_written(&self) -> u64 {
272        self.inner.lines_written()
273    }
274
275    /// Number of write errors.
276    #[must_use]
277    pub fn write_errors(&self) -> u64 {
278        self.inner.write_errors()
279    }
280
281    /// Human label.
282    #[must_use]
283    pub fn label(&self) -> &str {
284        self.inner.label()
285    }
286
287    /// Output directory path.
288    #[must_use]
289    pub fn output_path(&self) -> &Path {
290        self.inner.output_path().as_path()
291    }
292
293    /// Shared `Arc<NdjsonWriter>` for code paths that need sync access.
294    #[must_use]
295    pub fn shared(&self) -> Arc<NdjsonWriter> {
296        Arc::clone(&self.inner)
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    fn test_config(dir: &std::path::Path) -> FileWriterConfig {
305        FileWriterConfig {
306            path: dir.to_path_buf(),
307            rotation: RotationPeriod::Daily,
308            max_age_days: 1,
309            compress_rotated: false,
310        }
311    }
312
313    #[test]
314    fn test_write_single_line() {
315        let dir = tempfile::tempdir().expect("tempdir");
316        let config = test_config(dir.path());
317
318        let writer = NdjsonWriter::new(&config, "test-svc", "out.ndjson", "test").expect("create");
319        assert_eq!(writer.lines_written(), 0);
320        assert_eq!(writer.write_errors(), 0);
321
322        writer.write_line(b"{\"msg\":\"hello\"}\n").expect("write");
323        assert_eq!(writer.lines_written(), 1);
324
325        let content =
326            std::fs::read_to_string(dir.path().join("test-svc/out.ndjson")).expect("read");
327        assert_eq!(content.trim(), r#"{"msg":"hello"}"#);
328    }
329
330    #[test]
331    fn test_write_multiple_lines() {
332        let dir = tempfile::tempdir().expect("tempdir");
333        let config = test_config(dir.path());
334
335        let writer =
336            NdjsonWriter::new(&config, "multi", "events.ndjson", "output").expect("create");
337
338        for i in 0..3 {
339            let line = format!("{{\"n\":{i}}}\n");
340            writer.write_line(line.as_bytes()).expect("write");
341        }
342        assert_eq!(writer.lines_written(), 3);
343
344        let content =
345            std::fs::read_to_string(dir.path().join("multi/events.ndjson")).expect("read");
346        let lines: Vec<&str> = content.trim().lines().collect();
347        assert_eq!(lines.len(), 3);
348    }
349
350    #[test]
351    fn test_write_buf_batch() {
352        let dir = tempfile::tempdir().expect("tempdir");
353        let config = test_config(dir.path());
354
355        let writer = NdjsonWriter::new(&config, "batch", "out.ndjson", "test").expect("create");
356
357        let mut buf = Vec::new();
358        for i in 0..5 {
359            buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
360        }
361        writer.write_buf(&buf, 5).expect("write batch");
362        assert_eq!(writer.lines_written(), 5);
363
364        let content = std::fs::read_to_string(dir.path().join("batch/out.ndjson")).expect("read");
365        let lines: Vec<&str> = content.trim().lines().collect();
366        assert_eq!(lines.len(), 5);
367    }
368
369    #[test]
370    fn test_debug_format() {
371        let dir = tempfile::tempdir().expect("tempdir");
372        let config = test_config(dir.path());
373
374        let writer = NdjsonWriter::new(&config, "dbg", "out.ndjson", "dlq").expect("create");
375        let debug = format!("{writer:?}");
376        assert!(debug.contains("NdjsonWriter"));
377        assert!(debug.contains("dlq"));
378    }
379
380    #[test]
381    fn test_label_and_path() {
382        let dir = tempfile::tempdir().expect("tempdir");
383        let config = test_config(dir.path());
384
385        let writer = NdjsonWriter::new(&config, "svc", "data.ndjson", "output").expect("create");
386        assert_eq!(writer.label(), "output");
387        assert_eq!(writer.output_path(), &dir.path().join("svc"));
388    }
389
390    // -----------------------------------------------------------------
391    // AsyncNdjsonWriter tests -- these prove the async wrapper actually
392    // moves the sync work off the runtime thread.
393    // -----------------------------------------------------------------
394
395    #[tokio::test]
396    async fn async_write_line_writes_to_file() {
397        let dir = tempfile::tempdir().expect("tempdir");
398        let cfg = test_config(dir.path());
399        let writer = NdjsonWriter::new(&cfg, "async-svc", "out.ndjson", "test").expect("create");
400        let async_w = AsyncNdjsonWriter::new(writer);
401
402        async_w
403            .write_line(b"{\"k\":\"v\"}\n".to_vec())
404            .await
405            .expect("write_line");
406        assert_eq!(async_w.lines_written(), 1);
407        assert_eq!(async_w.write_errors(), 0);
408        assert_eq!(async_w.label(), "test");
409        assert_eq!(async_w.output_path(), dir.path().join("async-svc"));
410
411        let body = std::fs::read_to_string(dir.path().join("async-svc/out.ndjson")).expect("read");
412        assert_eq!(body.trim(), r#"{"k":"v"}"#);
413    }
414
415    #[tokio::test]
416    async fn async_writer_from_arc_shares_state() {
417        let dir = tempfile::tempdir().expect("tempdir");
418        let cfg = test_config(dir.path());
419        let writer = NdjsonWriter::new(&cfg, "share", "out.ndjson", "test").expect("create");
420        let shared = Arc::new(writer);
421        let a = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
422        let b = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
423
424        a.write_line(b"{\"a\":1}\n".to_vec()).await.expect("a");
425        b.write_line(b"{\"b\":2}\n".to_vec()).await.expect("b");
426
427        // Both views see the shared counter.
428        assert_eq!(a.lines_written(), 2);
429        assert_eq!(b.lines_written(), 2);
430        assert!(Arc::ptr_eq(&a.shared(), &b.shared()));
431    }
432
433    #[tokio::test]
434    async fn async_write_buf_writes_batch() {
435        let dir = tempfile::tempdir().expect("tempdir");
436        let cfg = test_config(dir.path());
437        let writer = NdjsonWriter::new(&cfg, "batch", "out.ndjson", "test").expect("create");
438        let async_w = AsyncNdjsonWriter::new(writer);
439
440        let mut buf = Vec::new();
441        for i in 0..5 {
442            buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
443        }
444        async_w.write_buf(buf, 5).await.expect("write_buf");
445        assert_eq!(async_w.lines_written(), 5);
446    }
447
448    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
449    async fn async_writer_does_not_block_runtime() {
450        // Prove that concurrent writers + ticker on the same runtime
451        // make progress concurrently -- i.e. write_line releases the
452        // runtime thread.
453        let dir = tempfile::tempdir().expect("tempdir");
454        let cfg = test_config(dir.path());
455        let writer = NdjsonWriter::new(&cfg, "concurrent", "out.ndjson", "test").expect("create");
456        let async_w = AsyncNdjsonWriter::new(writer);
457
458        let ticker_fired = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
459        let tf = ticker_fired.clone();
460        let ticker = tokio::spawn(async move {
461            let mut t = tokio::time::interval(std::time::Duration::from_millis(2));
462            t.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
463            t.tick().await; // burn the t=0 tick
464            for _ in 0..20 {
465                t.tick().await;
466                tf.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
467            }
468        });
469
470        let mut writers = Vec::new();
471        for _ in 0..4 {
472            let w = async_w.clone();
473            writers.push(tokio::spawn(async move {
474                for i in 0..50_u32 {
475                    w.write_line(format!("{{\"n\":{i}}}\n").into_bytes())
476                        .await
477                        .expect("write");
478                }
479            }));
480        }
481        for h in writers {
482            h.await.expect("writer task");
483        }
484        ticker.await.expect("ticker task");
485
486        assert_eq!(async_w.lines_written(), 200);
487        let ticks = ticker_fired.load(std::sync::atomic::Ordering::SeqCst);
488        assert!(
489            ticks >= 10,
490            "ticker fired only {ticks} times -- writers starved the runtime",
491        );
492    }
493}