libdd_log/
writers.rs

1// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::logger::FileConfig;
5use crate::logger::StdTarget;
6use chrono;
7use std::io::Write;
8use std::path::{Path, PathBuf};
9use std::{fs, io};
10use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
11use tracing_subscriber::fmt::MakeWriter;
12
13/// A custom file appender that handles optional size-based rotation
14/// tokio doesn't support file rotation, so we need to implement it ourselves.
15/// https://github.com/tokio-rs/tracing/pull/2497
16struct CustomFileAppender {
17    path: PathBuf,
18    current_size: u64,
19    max_size: u64,
20    max_files: u64,
21    current_file: fs::File,
22}
23
24impl CustomFileAppender {
25    fn new(config: &FileConfig) -> io::Result<Self> {
26        let path = Path::new(&config.path).to_path_buf();
27        let file = fs::OpenOptions::new()
28            .create(true)
29            .append(true)
30            .open(&path)?;
31
32        let current_size = file.metadata()?.len();
33
34        Ok(Self {
35            path,
36            current_size,
37            max_size: config.max_size_bytes,
38            max_files: config.max_files,
39            current_file: file,
40        })
41    }
42
43    /// Get the current timestamp as a string in the format YYYY-MM-DD_HH-MM-SS-MS
44    fn get_timestamp_string() -> String {
45        let now = chrono::Local::now();
46        let formatted = format!(
47            "{}-{:03}",
48            now.format("%Y-%m-%d_%H-%M-%S"),
49            now.timestamp_subsec_millis()
50        );
51        formatted
52    }
53
54    /// Build the rotated file path.
55    /// The rotated files names are appended with their rotated at timestamp.
56    /// The file extension is preserved.
57    /// If the file has no extension, the timestamp is appended without a dot.
58    fn build_rotated_path(&self, timestamp: &str) -> PathBuf {
59        match (self.path.file_stem(), self.path.extension()) {
60            (Some(stem), Some(ext)) => {
61                let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
62                parent.join(format!(
63                    "{}_{}.{}",
64                    stem.to_string_lossy(),
65                    timestamp,
66                    ext.to_string_lossy()
67                ))
68            }
69            (Some(stem), None) => {
70                let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
71                parent.join(format!("{}_{}", stem.to_string_lossy(), timestamp))
72            }
73            (None, _) => PathBuf::from(format!("{}_{}", self.path.display(), timestamp)),
74        }
75    }
76
77    /// Rotate the file if it exceeds the maximum size.
78    /// If the file exceeds the maximum size, it will be renamed to a new file with a timestamp
79    /// and the current file will be closed.
80    /// If the maximum number of files is exceeded, the oldest rotated files will be deleted.
81    /// The rotated files names are appended with their rotated at timestamp.
82    fn rotate_if_needed(&mut self) -> io::Result<()> {
83        if self.max_size > 0 && self.current_size >= self.max_size {
84            self.current_file.flush()?;
85
86            let timestamp = Self::get_timestamp_string();
87            let new_path = self.build_rotated_path(&timestamp);
88
89            fs::rename(&self.path, new_path)?;
90
91            self.current_file = fs::OpenOptions::new()
92                .create(true)
93                .append(true)
94                .open(&self.path)?;
95
96            self.current_size = 0;
97
98            if self.max_files > 0 {
99                self.cleanup_old_files(self.max_files)?;
100            }
101        }
102        Ok(())
103    }
104
105    /// Cleanup old files when the maximum number of files is exceeded.
106    /// The files are sorted by timestamp (newest first) to ensure we keep the most recent files
107    /// and delete the oldest ones when cleanup is needed.
108    /// The current file is never deleted.
109    fn cleanup_old_files(&self, max_files: u64) -> io::Result<()> {
110        if max_files == 0 {
111            return Ok(());
112        }
113
114        let parent_dir = self.path.parent().unwrap_or_else(|| Path::new("."));
115        let (base_name, extension) = match (self.path.file_stem(), self.path.extension()) {
116            (Some(stem), ext) => (
117                stem.to_string_lossy().to_string(),
118                ext.map(|e| e.to_string_lossy().to_string()),
119            ),
120            _ => return Ok(()),
121        };
122
123        let mut rotated_files: Vec<_> = fs::read_dir(parent_dir)?
124            .filter_map(|entry| entry.ok())
125            .filter_map(|entry| {
126                let file_name = entry.file_name().to_string_lossy().to_string();
127
128                let expected_prefix = format!("{base_name}_");
129                if file_name.starts_with(&expected_prefix) {
130                    match &extension {
131                        Some(ext) => {
132                            if file_name.ends_with(&format!(".{ext}")) {
133                                let timestamp_part = &file_name
134                                    [expected_prefix.len()..file_name.len() - ext.len() - 1];
135                                Some((entry.path(), timestamp_part.to_string()))
136                            } else {
137                                None
138                            }
139                        }
140                        None => {
141                            let timestamp_part = &file_name[expected_prefix.len()..];
142                            Some((entry.path(), timestamp_part.to_string()))
143                        }
144                    }
145                } else {
146                    None
147                }
148            })
149            .collect();
150
151        // Sort by timestamp (newest first) - this ensures we keep the most recent files
152        // and delete the oldest ones when cleanup is needed
153        rotated_files.sort_by(|(_, timestamp_a), (_, timestamp_b)| timestamp_b.cmp(timestamp_a));
154
155        let max_rotated_files = max_files.saturating_sub(1);
156
157        if rotated_files.len() > max_rotated_files as usize {
158            let mut cleanup_errors = Vec::new();
159
160            for (file_path, _) in rotated_files.iter().skip(max_rotated_files as usize) {
161                if let Err(e) = fs::remove_file(file_path) {
162                    cleanup_errors.push(format!("{}: {}", file_path.display(), e));
163                }
164            }
165
166            if !cleanup_errors.is_empty() {
167                return Err(io::Error::other(format!(
168                    "Failed to remove old log files: {}",
169                    cleanup_errors.join(", ")
170                )));
171            }
172        }
173
174        Ok(())
175    }
176}
177
178impl Write for CustomFileAppender {
179    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
180        self.rotate_if_needed()?;
181        let written = self.current_file.write(buf)?;
182        self.current_size += written as u64;
183        Ok(written)
184    }
185
186    fn flush(&mut self) -> io::Result<()> {
187        self.current_file.flush()
188    }
189
190    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
191        self.rotate_if_needed()?;
192        self.current_file.write_all(buf)?;
193        self.current_size += buf.len() as u64;
194        Ok(())
195    }
196}
197
198/// A non-blocking writer that writes log output to a file.
199///
200/// Uses a background thread to handle writes asynchronously, which improves
201/// performance by not blocking the logging thread. The background thread is
202/// managed by the internal `WorkerGuard`.
203pub struct FileWriter {
204    non_blocking: NonBlocking,
205    /// The WorkerGuard is crucial for the non-blocking writer's functionality.
206    ///
207    /// The guard represents ownership of the background worker thread that processes
208    /// writes asynchronously. When the guard is dropped, it ensures:
209    /// 1. All pending writes are flushed
210    /// 2. The worker thread is properly shut down
211    /// 3. No writes are lost
212    ///
213    /// If we don't keep the guard alive for the entire lifetime of the writer:
214    /// - The worker thread might be shut down prematurely
215    /// - Pending writes could be lost
216    /// - The non-blocking writer would stop functioning
217    ///
218    /// That's why we store it in the struct and name it with a leading underscore
219    /// to indicate it's intentionally unused but must be kept alive.
220    _guard: WorkerGuard,
221}
222
223impl FileWriter {
224    /// Creates a new file writer that writes to the specified path.
225    ///
226    /// If the parent directory doesn't exist, it will be created.
227    /// The file will be opened in append mode.
228    /// If size_bytes is specified in the config, the file will be rotated when it reaches that
229    /// size.
230    pub fn new(config: &FileConfig) -> io::Result<Self> {
231        let path = Path::new(&config.path);
232        if let Some(parent) = path.parent() {
233            fs::create_dir_all(parent)?;
234        }
235
236        let file_appender = CustomFileAppender::new(config)?;
237        let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
238
239        Ok(Self {
240            non_blocking,
241            _guard: guard,
242        })
243    }
244}
245
246impl Write for FileWriter {
247    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
248        self.non_blocking.write(buf)
249    }
250
251    fn flush(&mut self) -> io::Result<()> {
252        self.non_blocking.flush()
253    }
254}
255
256impl<'a> MakeWriter<'a> for FileWriter {
257    type Writer = NonBlocking;
258
259    fn make_writer(&'a self) -> Self::Writer {
260        self.non_blocking.clone()
261    }
262}
263
264/// A writer that writes log output to standard output or standard error.
265pub struct StdWriter {
266    target: StdTarget,
267}
268
269impl StdWriter {
270    /// Creates a new writer that writes to the specified standard stream.
271    pub fn new(target: StdTarget) -> Self {
272        Self { target }
273    }
274}
275
276impl Write for StdWriter {
277    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
278        match self.target {
279            StdTarget::Out => io::stdout().write(buf),
280            StdTarget::Err => io::stderr().write(buf),
281        }
282    }
283
284    fn flush(&mut self) -> io::Result<()> {
285        match self.target {
286            StdTarget::Out => io::stdout().flush(),
287            StdTarget::Err => io::stderr().flush(),
288        }
289    }
290}
291
292impl<'a> MakeWriter<'a> for StdWriter {
293    type Writer = StdWriter;
294
295    fn make_writer(&'a self) -> Self::Writer {
296        StdWriter::new(self.target)
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use std::fs;
304    use tempfile::TempDir;
305    #[test]
306    #[cfg_attr(miri, ignore)]
307    fn test_file_writer_basic_functionality() {
308        let temp_dir = TempDir::new().unwrap();
309        let file_path = temp_dir.path().join("test.log");
310        let config = FileConfig {
311            path: file_path.to_str().unwrap().to_string(),
312            max_size_bytes: 0,
313            max_files: 0,
314        };
315
316        let mut writer = FileWriter::new(&config).unwrap();
317
318        let test_data = b"Hello, World!\n";
319        let written = writer.write(test_data).unwrap();
320        assert_eq!(written, test_data.len());
321
322        writer.flush().unwrap();
323
324        std::thread::sleep(std::time::Duration::from_millis(100));
325
326        assert!(file_path.exists());
327        let content = fs::read_to_string(&file_path).unwrap();
328        assert!(content.contains("Hello, World!"));
329    }
330
331    #[test]
332    #[cfg_attr(miri, ignore)]
333    fn test_file_writer_creates_directories() {
334        let temp_dir = TempDir::new().unwrap();
335        let nested_path = temp_dir
336            .path()
337            .join("subdir")
338            .join("nested")
339            .join("test.log");
340        let config = FileConfig {
341            path: nested_path.to_str().unwrap().to_string(),
342            max_size_bytes: 0,
343            max_files: 0,
344        };
345
346        let writer = FileWriter::new(&config);
347        assert!(writer.is_ok());
348
349        assert!(nested_path.parent().unwrap().exists());
350    }
351
352    #[test]
353    #[cfg_attr(miri, ignore)]
354    fn test_basic_rotation() {
355        let temp_dir = TempDir::new().unwrap();
356        let file_path = temp_dir.path().join("rotate.log");
357        let config = FileConfig {
358            path: file_path.to_str().unwrap().to_string(),
359            max_size_bytes: 5,
360            max_files: 0,
361        };
362
363        let mut appender = CustomFileAppender::new(&config).unwrap();
364
365        appender.write_all(b"123456").unwrap(); // 6 bytes > 5 byte limit
366        appender.write_all(b"X").unwrap(); // Triggers rotation
367
368        let parent_dir = file_path.parent().unwrap();
369        let file_count = fs::read_dir(parent_dir)
370            .unwrap()
371            .filter_map(|entry| entry.ok())
372            .filter(|entry| {
373                let file_name = entry.file_name();
374                let name = file_name.to_string_lossy();
375                name.starts_with("rotate") && name.ends_with(".log")
376            })
377            .count();
378
379        assert_eq!(file_count, 2);
380        assert!(file_path.exists());
381    }
382
383    #[test]
384    #[cfg_attr(miri, ignore)]
385    fn test_max_files_cleanup() {
386        let temp_dir = TempDir::new().unwrap();
387        let file_path = temp_dir.path().join("cleanup.log");
388        let config = FileConfig {
389            path: file_path.to_str().unwrap().to_string(),
390            max_size_bytes: 5,
391            max_files: 2,
392        };
393
394        let mut appender = CustomFileAppender::new(&config).unwrap();
395
396        for _ in 0..3 {
397            appender.write_all(b"123456").unwrap(); // Exceed limit
398            appender.write_all(b"X").unwrap(); // Trigger rotation
399            std::thread::sleep(std::time::Duration::from_millis(10)); // Ensure different timestamps
400        }
401
402        let parent_dir = file_path.parent().unwrap();
403        let file_count = fs::read_dir(parent_dir)
404            .unwrap()
405            .filter_map(|entry| entry.ok())
406            .filter(|entry| {
407                let file_name = entry.file_name();
408                let name = file_name.to_string_lossy();
409                name.starts_with("cleanup") && name.ends_with(".log")
410            })
411            .count();
412
413        assert_eq!(file_count, 2);
414        assert!(file_path.exists());
415    }
416
417    #[test]
418    #[cfg_attr(miri, ignore)]
419    fn test_max_files_one_keeps_only_current() {
420        let temp_dir = TempDir::new().unwrap();
421        let file_path = temp_dir.path().join("single.log");
422        let config = FileConfig {
423            path: file_path.to_str().unwrap().to_string(),
424            max_size_bytes: 5,
425            max_files: 1,
426        };
427
428        let mut appender = CustomFileAppender::new(&config).unwrap();
429
430        for _ in 0..2 {
431            appender.write_all(b"123456").unwrap(); // Exceed limit
432            appender.write_all(b"X").unwrap(); // Trigger rotation
433            std::thread::sleep(std::time::Duration::from_millis(10));
434        }
435
436        let parent_dir = file_path.parent().unwrap();
437        let file_count = fs::read_dir(parent_dir)
438            .unwrap()
439            .filter_map(|entry| entry.ok())
440            .filter(|entry| {
441                let file_name = entry.file_name();
442                let name = file_name.to_string_lossy();
443                name.starts_with("single") && name.ends_with(".log")
444            })
445            .count();
446
447        assert_eq!(file_count, 1);
448        assert!(file_path.exists());
449    }
450
451    #[test]
452    #[cfg_attr(miri, ignore)]
453    fn test_no_rotation_when_size_not_specified() {
454        let temp_dir = TempDir::new().unwrap();
455        let file_path = temp_dir.path().join("no_rotation.log");
456        let config = FileConfig {
457            path: file_path.to_str().unwrap().to_string(),
458            max_size_bytes: 0,
459            max_files: 0,
460        };
461
462        let mut appender = CustomFileAppender::new(&config).unwrap();
463
464        for _ in 0..10 {
465            appender.write_all(&vec![b'X'; 1000]).unwrap();
466        }
467
468        let parent_dir = file_path.parent().unwrap();
469        let file_count = fs::read_dir(parent_dir)
470            .unwrap()
471            .filter_map(|entry| entry.ok())
472            .filter(|entry| {
473                let file_name = entry.file_name();
474                let name = file_name.to_string_lossy();
475                name.starts_with("no_rotation") && name.ends_with(".log")
476            })
477            .count();
478
479        assert_eq!(file_count, 1);
480        assert!(file_path.exists());
481    }
482
483    #[test]
484    #[cfg_attr(miri, ignore)]
485    fn test_invalid_path_handling() {
486        let config = FileConfig {
487            path: String::new(),
488            max_size_bytes: 0,
489            max_files: 0,
490        };
491        let result = FileWriter::new(&config);
492        assert!(result.is_err());
493    }
494}