hyperi_rustlib/output/
file.rs1use std::sync::Arc;
38
39use tracing::debug;
40
41use crate::io::NdjsonWriter;
42
43use super::config::FileOutputConfig;
44use super::error::OutputError;
45
46#[derive(Clone)]
51pub struct FileOutput {
52 writer: Arc<NdjsonWriter>,
53}
54
55impl std::fmt::Debug for FileOutput {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("FileOutput")
58 .field("lines_written", &self.writer.lines_written())
59 .field("write_errors", &self.writer.write_errors())
60 .field("output_path", self.writer.output_path())
61 .finish_non_exhaustive()
62 }
63}
64
65impl FileOutput {
66 pub fn new(config: &FileOutputConfig, service_name: &str) -> Result<Self, OutputError> {
80 if !config.enabled {
81 return Err(OutputError::Disabled);
82 }
83
84 let writer_config = config.to_writer_config();
85 let writer = NdjsonWriter::new(&writer_config, service_name, &config.filename, "output")?;
86
87 debug!(
88 service = service_name,
89 filename = %config.filename,
90 path = %config.path.display(),
91 "File output sink initialised"
92 );
93
94 Ok(Self {
95 writer: Arc::new(writer),
96 })
97 }
98
99 pub fn write(&self, data: &[u8]) -> Result<(), OutputError> {
103 if data.last() == Some(&b'\n') {
104 self.writer.write_line(data)?;
105 } else {
106 let mut line = Vec::with_capacity(data.len() + 1);
107 line.extend_from_slice(data);
108 line.push(b'\n');
109 self.writer.write_line(&line)?;
110 }
111 Ok(())
112 }
113
114 pub fn write_batch(&self, data: &[&[u8]]) -> Result<(), OutputError> {
117 if data.is_empty() {
118 return Ok(());
119 }
120
121 let total_len: usize = data.iter().map(|d| d.len() + 1).sum();
122 let mut buf = Vec::with_capacity(total_len);
123 for entry in data {
124 buf.extend_from_slice(entry);
125 if entry.last() != Some(&b'\n') {
126 buf.push(b'\n');
127 }
128 }
129
130 let count = data.len() as u64;
131 self.writer.write_buf(&buf, count)?;
132 Ok(())
133 }
134
135 pub async fn write_async(&self, data: Vec<u8>) -> Result<(), OutputError> {
143 let writer = Arc::clone(&self.writer);
144 tokio::task::spawn_blocking(move || -> Result<(), OutputError> {
145 let line: &[u8] = if data.last() == Some(&b'\n') {
146 &data
147 } else {
148 return {
150 let mut line = Vec::with_capacity(data.len() + 1);
151 line.extend_from_slice(&data);
152 line.push(b'\n');
153 writer.write_line(&line).map_err(OutputError::from)
154 };
155 };
156 writer.write_line(line).map_err(OutputError::from)
157 })
158 .await
159 .map_err(|e| OutputError::Io(std::io::Error::other(e)))?
160 }
161
162 pub async fn write_batch_async(&self, data: Vec<Vec<u8>>) -> Result<(), OutputError> {
169 if data.is_empty() {
170 return Ok(());
171 }
172 let writer = Arc::clone(&self.writer);
173 tokio::task::spawn_blocking(move || -> Result<(), OutputError> {
174 let total_len: usize = data.iter().map(|d| d.len() + 1).sum();
175 let mut buf = Vec::with_capacity(total_len);
176 for entry in &data {
177 buf.extend_from_slice(entry);
178 if entry.last() != Some(&b'\n') {
179 buf.push(b'\n');
180 }
181 }
182 let count = data.len() as u64;
183 writer.write_buf(&buf, count).map_err(OutputError::from)
184 })
185 .await
186 .map_err(|e| OutputError::Io(std::io::Error::other(e)))?
187 }
188
189 #[must_use]
191 pub fn lines_written(&self) -> u64 {
192 self.writer.lines_written()
193 }
194
195 #[must_use]
197 pub fn write_errors(&self) -> u64 {
198 self.writer.write_errors()
199 }
200
201 #[must_use]
205 pub fn shared_writer(&self) -> Arc<NdjsonWriter> {
206 Arc::clone(&self.writer)
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::io::RotationPeriod;
214
215 fn test_config(dir: &std::path::Path) -> FileOutputConfig {
216 FileOutputConfig {
217 enabled: true,
218 path: dir.to_path_buf(),
219 filename: "events.ndjson".into(),
220 rotation: RotationPeriod::Daily,
221 max_age_days: 1,
222 compress_rotated: false,
223 }
224 }
225
226 #[test]
227 fn test_disabled_returns_error() {
228 let config = FileOutputConfig::default(); let result = FileOutput::new(&config, "test");
230 assert!(result.is_err());
231 assert!(
232 matches!(result.unwrap_err(), OutputError::Disabled),
233 "expected Disabled error"
234 );
235 }
236
237 #[test]
238 fn test_write_single() {
239 let dir = tempfile::tempdir().expect("tempdir");
240 let config = test_config(dir.path());
241 let output = FileOutput::new(&config, "test-svc").expect("create");
242
243 output.write(b"{\"event\":\"login\"}").expect("write");
244 assert_eq!(output.lines_written(), 1);
245
246 let content =
247 std::fs::read_to_string(dir.path().join("test-svc/events.ndjson")).expect("read");
248 assert_eq!(content.trim(), r#"{"event":"login"}"#);
249 }
250
251 #[test]
252 fn test_write_with_trailing_newline() {
253 let dir = tempfile::tempdir().expect("tempdir");
254 let config = test_config(dir.path());
255 let output = FileOutput::new(&config, "nl-svc").expect("create");
256
257 output.write(b"{\"event\":\"test\"}\n").expect("write");
258 assert_eq!(output.lines_written(), 1);
259
260 let content =
261 std::fs::read_to_string(dir.path().join("nl-svc/events.ndjson")).expect("read");
262 assert_eq!(content.trim(), r#"{"event":"test"}"#);
263 }
264
265 #[test]
266 fn test_write_batch() {
267 let dir = tempfile::tempdir().expect("tempdir");
268 let config = test_config(dir.path());
269 let output = FileOutput::new(&config, "batch-svc").expect("create");
270
271 let events: Vec<&[u8]> = vec![b"{\"n\":0}", b"{\"n\":1}", b"{\"n\":2}"];
272 output.write_batch(&events).expect("batch write");
273 assert_eq!(output.lines_written(), 3);
274
275 let content =
276 std::fs::read_to_string(dir.path().join("batch-svc/events.ndjson")).expect("read");
277 let lines: Vec<&str> = content.trim().lines().collect();
278 assert_eq!(lines.len(), 3);
279 assert_eq!(lines[0], r#"{"n":0}"#);
280 assert_eq!(lines[2], r#"{"n":2}"#);
281 }
282
283 #[test]
284 fn test_write_batch_empty() {
285 let dir = tempfile::tempdir().expect("tempdir");
286 let config = test_config(dir.path());
287 let output = FileOutput::new(&config, "empty-svc").expect("create");
288
289 output.write_batch(&[]).expect("empty batch");
290 assert_eq!(output.lines_written(), 0);
291 }
292
293 #[test]
294 fn test_debug_format() {
295 let dir = tempfile::tempdir().expect("tempdir");
296 let config = test_config(dir.path());
297 let output = FileOutput::new(&config, "dbg-svc").expect("create");
298
299 let debug = format!("{output:?}");
300 assert!(debug.contains("FileOutput"));
301 assert!(debug.contains("lines_written"));
302 }
303
304 #[tokio::test]
305 async fn write_async_writes_to_file() {
306 let dir = tempfile::tempdir().expect("tempdir");
307 let cfg = test_config(dir.path());
308 let output = FileOutput::new(&cfg, "async-svc").expect("create");
309
310 output
311 .write_async(b"{\"k\":\"v\"}".to_vec())
312 .await
313 .expect("write_async");
314 assert_eq!(output.lines_written(), 1);
315
316 let body =
317 std::fs::read_to_string(dir.path().join("async-svc/events.ndjson")).expect("read");
318 assert_eq!(body.trim(), r#"{"k":"v"}"#);
319 }
320
321 #[tokio::test]
322 async fn write_batch_async_writes_to_file() {
323 let dir = tempfile::tempdir().expect("tempdir");
324 let cfg = test_config(dir.path());
325 let output = FileOutput::new(&cfg, "ab-svc").expect("create");
326
327 let batch: Vec<Vec<u8>> = (0..3)
328 .map(|i| format!("{{\"n\":{i}}}").into_bytes())
329 .collect();
330 output.write_batch_async(batch).await.expect("batch async");
331 assert_eq!(output.lines_written(), 3);
332
333 let body = std::fs::read_to_string(dir.path().join("ab-svc/events.ndjson")).expect("read");
334 assert_eq!(body.trim().lines().count(), 3);
335 }
336
337 #[tokio::test]
338 async fn write_batch_async_empty_is_noop() {
339 let dir = tempfile::tempdir().expect("tempdir");
340 let cfg = test_config(dir.path());
341 let output = FileOutput::new(&cfg, "empty-async").expect("create");
342 output.write_batch_async(vec![]).await.expect("empty async");
343 assert_eq!(output.lines_written(), 0);
344 }
345
346 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
347 async fn write_async_does_not_block_runtime() {
348 let dir = tempfile::tempdir().expect("tempdir");
349 let cfg = test_config(dir.path());
350 let output = FileOutput::new(&cfg, "nb-svc").expect("create");
351
352 let ticks = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
353 let tc = ticks.clone();
354 let ticker = tokio::spawn(async move {
355 let mut t = tokio::time::interval(std::time::Duration::from_millis(2));
356 t.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
357 t.tick().await;
358 for _ in 0..15 {
359 t.tick().await;
360 tc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
361 }
362 });
363
364 let mut writers = Vec::new();
365 for _ in 0..4 {
366 let o = output.clone();
367 writers.push(tokio::spawn(async move {
368 for i in 0..50_u32 {
369 o.write_async(format!("{{\"n\":{i}}}").into_bytes())
370 .await
371 .expect("write");
372 }
373 }));
374 }
375 for h in writers {
376 h.await.expect("writer task");
377 }
378 ticker.await.expect("ticker");
379
380 assert_eq!(output.lines_written(), 200);
381 let t = ticks.load(std::sync::atomic::Ordering::SeqCst);
382 assert!(
383 t >= 8,
384 "ticker fired only {t} times -- FileOutput starved the runtime",
385 );
386 }
387
388 #[tokio::test]
389 async fn clone_shares_writer() {
390 let dir = tempfile::tempdir().expect("tempdir");
391 let cfg = test_config(dir.path());
392 let a = FileOutput::new(&cfg, "share").expect("create");
393 let b = a.clone();
394
395 a.write_async(b"{\"a\":1}".to_vec()).await.expect("a");
396 b.write_async(b"{\"b\":2}".to_vec()).await.expect("b");
397
398 assert_eq!(a.lines_written(), 2);
399 assert_eq!(b.lines_written(), 2);
400 assert!(std::sync::Arc::ptr_eq(
401 &a.shared_writer(),
402 &b.shared_writer()
403 ));
404 }
405}