1use std::io::Write;
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35
36use file_rotate::suffix::AppendTimestamp;
37use file_rotate::suffix::FileLimit;
38use file_rotate::{ContentLimit, FileRotate, compression::Compression};
39use parking_lot::Mutex;
40use tracing::debug;
41
42use super::config::{FileWriterConfig, RotationPeriod};
43
44pub struct NdjsonWriter {
49 writer: Mutex<FileRotate<AppendTimestamp>>,
50 label: String,
51 output_path: PathBuf,
52 lines_written: AtomicU64,
53 write_errors: AtomicU64,
54}
55
56impl std::fmt::Debug for NdjsonWriter {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("NdjsonWriter")
59 .field("label", &self.label)
60 .field("output_path", &self.output_path)
61 .field("lines_written", &self.lines_written.load(Ordering::Relaxed))
62 .field("write_errors", &self.write_errors.load(Ordering::Relaxed))
63 .finish_non_exhaustive()
64 }
65}
66
67impl NdjsonWriter {
68 pub fn new(
84 config: &FileWriterConfig,
85 subdir: &str,
86 filename: &str,
87 label: &str,
88 ) -> Result<Self, std::io::Error> {
89 let dir = config.path.join(subdir);
90 std::fs::create_dir_all(&dir)?;
91
92 let file_path = dir.join(filename);
93
94 let content_limit = match config.rotation {
95 RotationPeriod::Hourly => ContentLimit::Time(file_rotate::TimeFrequency::Hourly),
96 RotationPeriod::Daily => ContentLimit::Time(file_rotate::TimeFrequency::Daily),
97 };
98
99 let max_age = chrono::Duration::days(i64::from(config.max_age_days));
100 let suffix_scheme = AppendTimestamp::default(FileLimit::Age(max_age));
101
102 let compression = if config.compress_rotated {
103 Compression::OnRotate(6)
104 } else {
105 Compression::None
106 };
107
108 let writer = FileRotate::new(file_path, suffix_scheme, content_limit, compression, None);
109
110 debug!(
111 label = label,
112 path = %dir.display(),
113 rotation = ?config.rotation,
114 "{} writer initialised",
115 label,
116 );
117
118 Ok(Self {
119 writer: Mutex::new(writer),
120 label: label.to_string(),
121 output_path: dir,
122 lines_written: AtomicU64::new(0),
123 write_errors: AtomicU64::new(0),
124 })
125 }
126
127 pub fn write_line(&self, line: &[u8]) -> Result<(), std::io::Error> {
132 let mut writer = self.writer.lock();
133 if let Err(e) = writer.write_all(line).and_then(|()| writer.flush()) {
134 self.write_errors.fetch_add(1, Ordering::Relaxed);
135 return Err(e);
136 }
137 self.lines_written.fetch_add(1, Ordering::Relaxed);
138 Ok(())
139 }
140
141 pub fn write_buf(&self, buf: &[u8], count: u64) -> Result<(), std::io::Error> {
146 let mut writer = self.writer.lock();
147 if let Err(e) = writer.write_all(buf).and_then(|()| writer.flush()) {
148 self.write_errors.fetch_add(1, Ordering::Relaxed);
149 return Err(e);
150 }
151 self.lines_written.fetch_add(count, Ordering::Relaxed);
152 Ok(())
153 }
154
155 pub fn flush(&self) -> Result<(), std::io::Error> {
168 let mut writer = self.writer.lock();
169 if let Err(e) = writer.flush() {
170 self.write_errors.fetch_add(1, Ordering::Relaxed);
171 return Err(e);
172 }
173 Ok(())
174 }
175
176 pub fn lines_written(&self) -> u64 {
178 self.lines_written.load(Ordering::Relaxed)
179 }
180
181 pub fn write_errors(&self) -> u64 {
183 self.write_errors.load(Ordering::Relaxed)
184 }
185
186 pub fn label(&self) -> &str {
188 &self.label
189 }
190
191 pub fn output_path(&self) -> &PathBuf {
193 &self.output_path
194 }
195}
196
197#[derive(Debug, Clone)]
206pub struct AsyncNdjsonWriter {
207 inner: Arc<NdjsonWriter>,
208}
209
210impl AsyncNdjsonWriter {
211 #[must_use]
214 pub fn new(writer: NdjsonWriter) -> Self {
215 Self {
216 inner: Arc::new(writer),
217 }
218 }
219
220 #[must_use]
223 pub fn from_arc(writer: Arc<NdjsonWriter>) -> Self {
224 Self { inner: writer }
225 }
226
227 pub async fn write_line(&self, line: Vec<u8>) -> Result<(), std::io::Error> {
235 let inner = Arc::clone(&self.inner);
236 tokio::task::spawn_blocking(move || inner.write_line(&line))
237 .await
238 .map_err(std::io::Error::other)?
239 }
240
241 pub async fn write_buf(&self, buf: Vec<u8>, count: u64) -> Result<(), std::io::Error> {
247 let inner = Arc::clone(&self.inner);
248 tokio::task::spawn_blocking(move || inner.write_buf(&buf, count))
249 .await
250 .map_err(std::io::Error::other)?
251 }
252
253 pub async fn flush(&self) -> Result<(), std::io::Error> {
263 let inner = Arc::clone(&self.inner);
264 tokio::task::spawn_blocking(move || inner.flush())
265 .await
266 .map_err(std::io::Error::other)?
267 }
268
269 #[must_use]
271 pub fn lines_written(&self) -> u64 {
272 self.inner.lines_written()
273 }
274
275 #[must_use]
277 pub fn write_errors(&self) -> u64 {
278 self.inner.write_errors()
279 }
280
281 #[must_use]
283 pub fn label(&self) -> &str {
284 self.inner.label()
285 }
286
287 #[must_use]
289 pub fn output_path(&self) -> &Path {
290 self.inner.output_path().as_path()
291 }
292
293 #[must_use]
295 pub fn shared(&self) -> Arc<NdjsonWriter> {
296 Arc::clone(&self.inner)
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 fn test_config(dir: &std::path::Path) -> FileWriterConfig {
305 FileWriterConfig {
306 path: dir.to_path_buf(),
307 rotation: RotationPeriod::Daily,
308 max_age_days: 1,
309 compress_rotated: false,
310 }
311 }
312
313 #[test]
314 fn test_write_single_line() {
315 let dir = tempfile::tempdir().expect("tempdir");
316 let config = test_config(dir.path());
317
318 let writer = NdjsonWriter::new(&config, "test-svc", "out.ndjson", "test").expect("create");
319 assert_eq!(writer.lines_written(), 0);
320 assert_eq!(writer.write_errors(), 0);
321
322 writer.write_line(b"{\"msg\":\"hello\"}\n").expect("write");
323 assert_eq!(writer.lines_written(), 1);
324
325 let content =
326 std::fs::read_to_string(dir.path().join("test-svc/out.ndjson")).expect("read");
327 assert_eq!(content.trim(), r#"{"msg":"hello"}"#);
328 }
329
330 #[test]
331 fn test_write_multiple_lines() {
332 let dir = tempfile::tempdir().expect("tempdir");
333 let config = test_config(dir.path());
334
335 let writer =
336 NdjsonWriter::new(&config, "multi", "events.ndjson", "output").expect("create");
337
338 for i in 0..3 {
339 let line = format!("{{\"n\":{i}}}\n");
340 writer.write_line(line.as_bytes()).expect("write");
341 }
342 assert_eq!(writer.lines_written(), 3);
343
344 let content =
345 std::fs::read_to_string(dir.path().join("multi/events.ndjson")).expect("read");
346 let lines: Vec<&str> = content.trim().lines().collect();
347 assert_eq!(lines.len(), 3);
348 }
349
350 #[test]
351 fn test_write_buf_batch() {
352 let dir = tempfile::tempdir().expect("tempdir");
353 let config = test_config(dir.path());
354
355 let writer = NdjsonWriter::new(&config, "batch", "out.ndjson", "test").expect("create");
356
357 let mut buf = Vec::new();
358 for i in 0..5 {
359 buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
360 }
361 writer.write_buf(&buf, 5).expect("write batch");
362 assert_eq!(writer.lines_written(), 5);
363
364 let content = std::fs::read_to_string(dir.path().join("batch/out.ndjson")).expect("read");
365 let lines: Vec<&str> = content.trim().lines().collect();
366 assert_eq!(lines.len(), 5);
367 }
368
369 #[test]
370 fn test_debug_format() {
371 let dir = tempfile::tempdir().expect("tempdir");
372 let config = test_config(dir.path());
373
374 let writer = NdjsonWriter::new(&config, "dbg", "out.ndjson", "dlq").expect("create");
375 let debug = format!("{writer:?}");
376 assert!(debug.contains("NdjsonWriter"));
377 assert!(debug.contains("dlq"));
378 }
379
380 #[test]
381 fn test_label_and_path() {
382 let dir = tempfile::tempdir().expect("tempdir");
383 let config = test_config(dir.path());
384
385 let writer = NdjsonWriter::new(&config, "svc", "data.ndjson", "output").expect("create");
386 assert_eq!(writer.label(), "output");
387 assert_eq!(writer.output_path(), &dir.path().join("svc"));
388 }
389
390 #[tokio::test]
396 async fn async_write_line_writes_to_file() {
397 let dir = tempfile::tempdir().expect("tempdir");
398 let cfg = test_config(dir.path());
399 let writer = NdjsonWriter::new(&cfg, "async-svc", "out.ndjson", "test").expect("create");
400 let async_w = AsyncNdjsonWriter::new(writer);
401
402 async_w
403 .write_line(b"{\"k\":\"v\"}\n".to_vec())
404 .await
405 .expect("write_line");
406 assert_eq!(async_w.lines_written(), 1);
407 assert_eq!(async_w.write_errors(), 0);
408 assert_eq!(async_w.label(), "test");
409 assert_eq!(async_w.output_path(), dir.path().join("async-svc"));
410
411 let body = std::fs::read_to_string(dir.path().join("async-svc/out.ndjson")).expect("read");
412 assert_eq!(body.trim(), r#"{"k":"v"}"#);
413 }
414
415 #[tokio::test]
416 async fn async_writer_from_arc_shares_state() {
417 let dir = tempfile::tempdir().expect("tempdir");
418 let cfg = test_config(dir.path());
419 let writer = NdjsonWriter::new(&cfg, "share", "out.ndjson", "test").expect("create");
420 let shared = Arc::new(writer);
421 let a = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
422 let b = AsyncNdjsonWriter::from_arc(Arc::clone(&shared));
423
424 a.write_line(b"{\"a\":1}\n".to_vec()).await.expect("a");
425 b.write_line(b"{\"b\":2}\n".to_vec()).await.expect("b");
426
427 assert_eq!(a.lines_written(), 2);
429 assert_eq!(b.lines_written(), 2);
430 assert!(Arc::ptr_eq(&a.shared(), &b.shared()));
431 }
432
433 #[tokio::test]
434 async fn async_write_buf_writes_batch() {
435 let dir = tempfile::tempdir().expect("tempdir");
436 let cfg = test_config(dir.path());
437 let writer = NdjsonWriter::new(&cfg, "batch", "out.ndjson", "test").expect("create");
438 let async_w = AsyncNdjsonWriter::new(writer);
439
440 let mut buf = Vec::new();
441 for i in 0..5 {
442 buf.extend_from_slice(format!("{{\"n\":{i}}}\n").as_bytes());
443 }
444 async_w.write_buf(buf, 5).await.expect("write_buf");
445 assert_eq!(async_w.lines_written(), 5);
446 }
447
448 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
449 async fn async_writer_does_not_block_runtime() {
450 let dir = tempfile::tempdir().expect("tempdir");
454 let cfg = test_config(dir.path());
455 let writer = NdjsonWriter::new(&cfg, "concurrent", "out.ndjson", "test").expect("create");
456 let async_w = AsyncNdjsonWriter::new(writer);
457
458 let ticker_fired = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
459 let tf = ticker_fired.clone();
460 let ticker = tokio::spawn(async move {
461 let mut t = tokio::time::interval(std::time::Duration::from_millis(2));
462 t.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
463 t.tick().await; for _ in 0..20 {
465 t.tick().await;
466 tf.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
467 }
468 });
469
470 let mut writers = Vec::new();
471 for _ in 0..4 {
472 let w = async_w.clone();
473 writers.push(tokio::spawn(async move {
474 for i in 0..50_u32 {
475 w.write_line(format!("{{\"n\":{i}}}\n").into_bytes())
476 .await
477 .expect("write");
478 }
479 }));
480 }
481 for h in writers {
482 h.await.expect("writer task");
483 }
484 ticker.await.expect("ticker task");
485
486 assert_eq!(async_w.lines_written(), 200);
487 let ticks = ticker_fired.load(std::sync::atomic::Ordering::SeqCst);
488 assert!(
489 ticks >= 10,
490 "ticker fired only {ticks} times -- writers starved the runtime",
491 );
492 }
493}