Skip to main content

hyperi_rustlib/output/
file.rs

1// Project:   hyperi-rustlib
2// File:      src/output/file.rs
3// Purpose:   File output sink using NdjsonWriter
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! File output sink for raw NDJSON events.
10//!
11//! Writes raw JSON bytes to rotating files using the shared [`NdjsonWriter`].
12//! Used for testing and bare-metal deployments where Kafka is not available.
13//!
14//! ## Sync vs async API
15//!
16//! - [`FileOutput::write`] / [`FileOutput::write_batch`] are SYNC -- they
17//!   call into the parking_lot-protected `NdjsonWriter` directly. Cheap
18//!   (~µs) but block the calling thread. Safe from sync code, tests, and
19//!   pre-runtime startup.
20//! - [`FileOutput::write_async`] / [`FileOutput::write_batch_async`] are
21//!   ASYNC -- they hand the sync work to `tokio::task::spawn_blocking` so
22//!   the tokio runtime is never stalled. Use these from `async fn`
23//!   bodies.
24//!
25//! Both APIs share the same underlying `Arc<NdjsonWriter>`, so counters
26//! and rotation state are consistent regardless of which path is taken.
27//!
28//! ## File Layout
29//!
30//! ```text
31//! /var/spool/dfe/output/loader/
32//! ├── events.ndjson              # Current file
33//! ├── events.ndjson.20260302T14  # Rotated (hourly)
34//! └── events.ndjson.20260302T13.gz  # Compressed
35//! ```
36
37use std::sync::Arc;
38
39use tracing::debug;
40
41use crate::io::NdjsonWriter;
42
43use super::config::FileOutputConfig;
44use super::error::OutputError;
45
46/// File output sink for raw NDJSON events.
47///
48/// Wraps [`NdjsonWriter`] with output-specific configuration and logging.
49/// Cheap to `Clone` -- the inner writer is shared via `Arc`.
50#[derive(Clone)]
51pub struct FileOutput {
52    writer: Arc<NdjsonWriter>,
53}
54
55impl std::fmt::Debug for FileOutput {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("FileOutput")
58            .field("lines_written", &self.writer.lines_written())
59            .field("write_errors", &self.writer.write_errors())
60            .field("output_path", self.writer.output_path())
61            .finish_non_exhaustive()
62    }
63}
64
65impl FileOutput {
66    /// Create a new file output sink.
67    ///
68    /// Creates the output directory if it doesn't exist.
69    ///
70    /// # Arguments
71    ///
72    /// * `config` -- File output configuration
73    /// * `service_name` -- Used as subdirectory name (e.g. "loader", "receiver")
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if the output directory cannot be created or the sink
78    /// is disabled.
79    pub fn new(config: &FileOutputConfig, service_name: &str) -> Result<Self, OutputError> {
80        if !config.enabled {
81            return Err(OutputError::Disabled);
82        }
83
84        let writer_config = config.to_writer_config();
85        let writer = NdjsonWriter::new(&writer_config, service_name, &config.filename, "output")?;
86
87        debug!(
88            service = service_name,
89            filename = %config.filename,
90            path = %config.path.display(),
91            "File output sink initialised"
92        );
93
94        Ok(Self {
95            writer: Arc::new(writer),
96        })
97    }
98
99    /// Write a single raw JSON bytes line (sync). Blocks the calling
100    /// thread on disk I/O. Safe from sync code; **never call from an
101    /// `async fn` body** -- use [`Self::write_async`] instead.
102    pub fn write(&self, data: &[u8]) -> Result<(), OutputError> {
103        if data.last() == Some(&b'\n') {
104            self.writer.write_line(data)?;
105        } else {
106            let mut line = Vec::with_capacity(data.len() + 1);
107            line.extend_from_slice(data);
108            line.push(b'\n');
109            self.writer.write_line(&line)?;
110        }
111        Ok(())
112    }
113
114    /// Write a batch of raw JSON bytes lines (sync). Same caveats as
115    /// [`Self::write`] -- use [`Self::write_batch_async`] from `async fn`.
116    pub fn write_batch(&self, data: &[&[u8]]) -> Result<(), OutputError> {
117        if data.is_empty() {
118            return Ok(());
119        }
120
121        let total_len: usize = data.iter().map(|d| d.len() + 1).sum();
122        let mut buf = Vec::with_capacity(total_len);
123        for entry in data {
124            buf.extend_from_slice(entry);
125            if entry.last() != Some(&b'\n') {
126                buf.push(b'\n');
127            }
128        }
129
130        let count = data.len() as u64;
131        self.writer.write_buf(&buf, count)?;
132        Ok(())
133    }
134
135    /// Async write -- runs the rotate-and-write on a blocking thread via
136    /// `tokio::task::spawn_blocking`. Hot-path safe for async callers.
137    ///
138    /// # Errors
139    ///
140    /// Returns the underlying [`OutputError`] from the sync writer, or
141    /// an `OutputError::Io` if the blocking thread panicked.
142    pub async fn write_async(&self, data: Vec<u8>) -> Result<(), OutputError> {
143        let writer = Arc::clone(&self.writer);
144        tokio::task::spawn_blocking(move || -> Result<(), OutputError> {
145            let line: &[u8] = if data.last() == Some(&b'\n') {
146                &data
147            } else {
148                // Borrow check: build the owned buffer in the same scope.
149                return {
150                    let mut line = Vec::with_capacity(data.len() + 1);
151                    line.extend_from_slice(&data);
152                    line.push(b'\n');
153                    writer.write_line(&line).map_err(OutputError::from)
154                };
155            };
156            writer.write_line(line).map_err(OutputError::from)
157        })
158        .await
159        .map_err(|e| OutputError::Io(std::io::Error::other(e)))?
160    }
161
162    /// Async batch write -- coalesces lines into a single buffer and runs
163    /// the rotate-and-write on a blocking thread.
164    ///
165    /// # Errors
166    ///
167    /// As [`Self::write_async`].
168    pub async fn write_batch_async(&self, data: Vec<Vec<u8>>) -> Result<(), OutputError> {
169        if data.is_empty() {
170            return Ok(());
171        }
172        let writer = Arc::clone(&self.writer);
173        tokio::task::spawn_blocking(move || -> Result<(), OutputError> {
174            let total_len: usize = data.iter().map(|d| d.len() + 1).sum();
175            let mut buf = Vec::with_capacity(total_len);
176            for entry in &data {
177                buf.extend_from_slice(entry);
178                if entry.last() != Some(&b'\n') {
179                    buf.push(b'\n');
180                }
181            }
182            let count = data.len() as u64;
183            writer.write_buf(&buf, count).map_err(OutputError::from)
184        })
185        .await
186        .map_err(|e| OutputError::Io(std::io::Error::other(e)))?
187    }
188
189    /// Number of lines successfully written.
190    #[must_use]
191    pub fn lines_written(&self) -> u64 {
192        self.writer.lines_written()
193    }
194
195    /// Number of write errors encountered.
196    #[must_use]
197    pub fn write_errors(&self) -> u64 {
198        self.writer.write_errors()
199    }
200
201    /// Shared `Arc<NdjsonWriter>` for callers that need both sync and
202    /// async access to the same underlying writer (e.g. building an
203    /// [`crate::io::AsyncNdjsonWriter`] view).
204    #[must_use]
205    pub fn shared_writer(&self) -> Arc<NdjsonWriter> {
206        Arc::clone(&self.writer)
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use crate::io::RotationPeriod;
214
215    fn test_config(dir: &std::path::Path) -> FileOutputConfig {
216        FileOutputConfig {
217            enabled: true,
218            path: dir.to_path_buf(),
219            filename: "events.ndjson".into(),
220            rotation: RotationPeriod::Daily,
221            max_age_days: 1,
222            compress_rotated: false,
223        }
224    }
225
226    #[test]
227    fn test_disabled_returns_error() {
228        let config = FileOutputConfig::default(); // enabled: false
229        let result = FileOutput::new(&config, "test");
230        assert!(result.is_err());
231        assert!(
232            matches!(result.unwrap_err(), OutputError::Disabled),
233            "expected Disabled error"
234        );
235    }
236
237    #[test]
238    fn test_write_single() {
239        let dir = tempfile::tempdir().expect("tempdir");
240        let config = test_config(dir.path());
241        let output = FileOutput::new(&config, "test-svc").expect("create");
242
243        output.write(b"{\"event\":\"login\"}").expect("write");
244        assert_eq!(output.lines_written(), 1);
245
246        let content =
247            std::fs::read_to_string(dir.path().join("test-svc/events.ndjson")).expect("read");
248        assert_eq!(content.trim(), r#"{"event":"login"}"#);
249    }
250
251    #[test]
252    fn test_write_with_trailing_newline() {
253        let dir = tempfile::tempdir().expect("tempdir");
254        let config = test_config(dir.path());
255        let output = FileOutput::new(&config, "nl-svc").expect("create");
256
257        output.write(b"{\"event\":\"test\"}\n").expect("write");
258        assert_eq!(output.lines_written(), 1);
259
260        let content =
261            std::fs::read_to_string(dir.path().join("nl-svc/events.ndjson")).expect("read");
262        assert_eq!(content.trim(), r#"{"event":"test"}"#);
263    }
264
265    #[test]
266    fn test_write_batch() {
267        let dir = tempfile::tempdir().expect("tempdir");
268        let config = test_config(dir.path());
269        let output = FileOutput::new(&config, "batch-svc").expect("create");
270
271        let events: Vec<&[u8]> = vec![b"{\"n\":0}", b"{\"n\":1}", b"{\"n\":2}"];
272        output.write_batch(&events).expect("batch write");
273        assert_eq!(output.lines_written(), 3);
274
275        let content =
276            std::fs::read_to_string(dir.path().join("batch-svc/events.ndjson")).expect("read");
277        let lines: Vec<&str> = content.trim().lines().collect();
278        assert_eq!(lines.len(), 3);
279        assert_eq!(lines[0], r#"{"n":0}"#);
280        assert_eq!(lines[2], r#"{"n":2}"#);
281    }
282
283    #[test]
284    fn test_write_batch_empty() {
285        let dir = tempfile::tempdir().expect("tempdir");
286        let config = test_config(dir.path());
287        let output = FileOutput::new(&config, "empty-svc").expect("create");
288
289        output.write_batch(&[]).expect("empty batch");
290        assert_eq!(output.lines_written(), 0);
291    }
292
293    #[test]
294    fn test_debug_format() {
295        let dir = tempfile::tempdir().expect("tempdir");
296        let config = test_config(dir.path());
297        let output = FileOutput::new(&config, "dbg-svc").expect("create");
298
299        let debug = format!("{output:?}");
300        assert!(debug.contains("FileOutput"));
301        assert!(debug.contains("lines_written"));
302    }
303
304    #[tokio::test]
305    async fn write_async_writes_to_file() {
306        let dir = tempfile::tempdir().expect("tempdir");
307        let cfg = test_config(dir.path());
308        let output = FileOutput::new(&cfg, "async-svc").expect("create");
309
310        output
311            .write_async(b"{\"k\":\"v\"}".to_vec())
312            .await
313            .expect("write_async");
314        assert_eq!(output.lines_written(), 1);
315
316        let body =
317            std::fs::read_to_string(dir.path().join("async-svc/events.ndjson")).expect("read");
318        assert_eq!(body.trim(), r#"{"k":"v"}"#);
319    }
320
321    #[tokio::test]
322    async fn write_batch_async_writes_to_file() {
323        let dir = tempfile::tempdir().expect("tempdir");
324        let cfg = test_config(dir.path());
325        let output = FileOutput::new(&cfg, "ab-svc").expect("create");
326
327        let batch: Vec<Vec<u8>> = (0..3)
328            .map(|i| format!("{{\"n\":{i}}}").into_bytes())
329            .collect();
330        output.write_batch_async(batch).await.expect("batch async");
331        assert_eq!(output.lines_written(), 3);
332
333        let body = std::fs::read_to_string(dir.path().join("ab-svc/events.ndjson")).expect("read");
334        assert_eq!(body.trim().lines().count(), 3);
335    }
336
337    #[tokio::test]
338    async fn write_batch_async_empty_is_noop() {
339        let dir = tempfile::tempdir().expect("tempdir");
340        let cfg = test_config(dir.path());
341        let output = FileOutput::new(&cfg, "empty-async").expect("create");
342        output.write_batch_async(vec![]).await.expect("empty async");
343        assert_eq!(output.lines_written(), 0);
344    }
345
346    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
347    async fn write_async_does_not_block_runtime() {
348        let dir = tempfile::tempdir().expect("tempdir");
349        let cfg = test_config(dir.path());
350        let output = FileOutput::new(&cfg, "nb-svc").expect("create");
351
352        let ticks = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
353        let tc = ticks.clone();
354        let ticker = tokio::spawn(async move {
355            let mut t = tokio::time::interval(std::time::Duration::from_millis(2));
356            t.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
357            t.tick().await;
358            for _ in 0..15 {
359                t.tick().await;
360                tc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
361            }
362        });
363
364        let mut writers = Vec::new();
365        for _ in 0..4 {
366            let o = output.clone();
367            writers.push(tokio::spawn(async move {
368                for i in 0..50_u32 {
369                    o.write_async(format!("{{\"n\":{i}}}").into_bytes())
370                        .await
371                        .expect("write");
372                }
373            }));
374        }
375        for h in writers {
376            h.await.expect("writer task");
377        }
378        ticker.await.expect("ticker");
379
380        assert_eq!(output.lines_written(), 200);
381        let t = ticks.load(std::sync::atomic::Ordering::SeqCst);
382        assert!(
383            t >= 8,
384            "ticker fired only {t} times -- FileOutput starved the runtime",
385        );
386    }
387
388    #[tokio::test]
389    async fn clone_shares_writer() {
390        let dir = tempfile::tempdir().expect("tempdir");
391        let cfg = test_config(dir.path());
392        let a = FileOutput::new(&cfg, "share").expect("create");
393        let b = a.clone();
394
395        a.write_async(b"{\"a\":1}".to_vec()).await.expect("a");
396        b.write_async(b"{\"b\":2}".to_vec()).await.expect("b");
397
398        assert_eq!(a.lines_written(), 2);
399        assert_eq!(b.lines_written(), 2);
400        assert!(std::sync::Arc::ptr_eq(
401            &a.shared_writer(),
402            &b.shared_writer()
403        ));
404    }
405}