log_writer/
lib.rs

1//! A library to write a stream to disk while adhering usage limits.
2//! Inspired by journald, but more general-purpose.
3
4mod fsstats;
5
6use chrono::Local;
7use log::{info, warn};
8#[cfg(feature = "serde")]
9use serde::{Deserialize, Serialize};
10use std::fmt::Debug;
11use std::fs;
12use std::io::{BufWriter, Error, Result, Write};
13use std::os::unix::fs::MetadataExt;
14use std::path::PathBuf;
15use std::time::Instant;
16
17#[derive(Debug, Clone, PartialEq)]
18#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
19pub struct LogWriterConfig {
20    pub target_dir: PathBuf,
21    pub prefix: String,
22    pub suffix: String,
23    /// The maximum amount of space that is allowed to be used,
24    /// relative to the total space (0.01 = 1%)
25    pub max_use_of_total: Option<f64>,
26    /// The maximum amount of space that is allowed to be used,
27    /// in bytes
28    pub max_use_bytes: Option<u64>,
29    /// The minimum amount of space that should be kept available at all times,
30    /// relative to the total space (0.01 = 1%)
31    pub min_avail_of_total: Option<f64>,
32    pub warn_if_avail_reached: bool,
33    /// The minimum amount of space that should be kept available at all times,
34    /// in bytes
35    pub min_avail_bytes: Option<usize>,
36    pub max_file_size: usize,
37    /// Rotated after X seconds, regardless of size
38    pub max_file_age: Option<u64>,
39    /// Disk space subtracted when checking if max_use_of_total is reached.
40    /// Set this to the absolute amount of space you expect other services to take up on the
41    /// partition.
42    /// in bytes
43    pub reserved: Option<usize>,
44}
45
46/// Writes a stream to disk while adhering to the usage limits described in `cfg`.
47///
48/// When `write()` is called, the LogWriter will attempt to ensure enough space is
49/// available to write the new contents. In some cases, where no more space can be
50/// freed, `ENOSPC` may be returned.
51pub struct LogWriter<T: LogWriterCallbacks + Sized + Clone + Debug> {
52    cfg: LogWriterConfig,
53    current: BufWriter<fs::File>,
54    current_name: String,
55    current_size: usize,
56    write_start: Instant,
57    callbacks: T,
58}
59
60pub trait LogWriterCallbacks: Sized + Clone + Debug {
61    fn start_file(&mut self, log_writer: &mut LogWriter<Self>) -> Result<()>;
62    fn end_file(&mut self, log_writer: &mut LogWriter<Self>) -> Result<()>;
63}
64
65#[derive(Clone, Debug)]
66pub struct NoopLogWriterCallbacks;
67impl LogWriterCallbacks for NoopLogWriterCallbacks {
68    fn start_file(&mut self, _log_writer: &mut LogWriter<Self>) -> Result<()> {
69        Ok(())
70    }
71    fn end_file(&mut self, _log_writer: &mut LogWriter<Self>) -> Result<()> {
72        Ok(())
73    }
74}
75
76fn create_next_file(cfg: &LogWriterConfig) -> Result<(String, BufWriter<fs::File>)> {
77    let name = format!(
78        "{}{}{}",
79        cfg.prefix,
80        Local::now().format("%Y-%m-%d-%H-%M-%S"),
81        cfg.suffix
82    );
83    let file = fs::OpenOptions::new()
84        .write(true)
85        .create(true)
86        .open(cfg.target_dir.join(&name))?;
87    Ok((name, BufWriter::new(file)))
88}
89
90impl LogWriter<NoopLogWriterCallbacks> {
91    pub fn new(cfg: LogWriterConfig) -> Result<Self> {
92        LogWriter::new_with_callbacks(cfg, NoopLogWriterCallbacks)
93    }
94}
95
96impl<T: LogWriterCallbacks + Sized + Clone + Debug> LogWriter<T> {
97    pub fn new_with_callbacks(cfg: LogWriterConfig, callbacks: T) -> Result<Self> {
98        fs::create_dir_all(&cfg.target_dir)?;
99        let (current_name, current) = create_next_file(&cfg)?;
100        let mut log_writer = Self {
101            cfg,
102            current_name,
103            current,
104            current_size: 0,
105            write_start: Instant::now(),
106            callbacks,
107        };
108        log_writer.callbacks.clone().start_file(&mut log_writer)?;
109        Ok(log_writer)
110    }
111
112    fn file_listing<'a>(&'a self) -> Result<impl Iterator<Item = (fs::DirEntry, String)> + 'a> {
113        let prefix = self.cfg.prefix.clone();
114        let suffix = self.cfg.suffix.clone();
115        let iter = fs::read_dir(&self.cfg.target_dir)?
116            .filter_map(|x| x.ok())
117            .filter(|x| x.file_type().and_then(|t| Ok(t.is_file())).unwrap_or(false))
118            .filter_map(|file| match file.file_name().into_string() {
119                Ok(file_name) => Some((file, file_name)),
120                Err(_) => None,
121            })
122            .filter(move |(_, file_name)| {
123                file_name.starts_with(&prefix) && file_name.ends_with(&suffix)
124            });
125        Ok(iter)
126    }
127
128    fn enough_space(&mut self, len: usize) -> Result<bool> {
129        let fsstat = fsstats::statvfs(&self.cfg.target_dir)?;
130
131        let mut size_limit = u64::MAX;
132
133        if let Some(max_use_bytes) = self.cfg.max_use_bytes {
134            size_limit = std::cmp::min(size_limit, max_use_bytes);
135        }
136
137        if let Some(max_use_of_total) = self.cfg.max_use_of_total {
138            let mut max_use_bytes = (max_use_of_total * fsstat.total_space as f64) as u64;
139            if let Some(reserved) = self.cfg.reserved {
140                max_use_bytes -= reserved as u64;
141            };
142            size_limit = std::cmp::min(size_limit, max_use_bytes);
143        }
144
145        if size_limit != u64::MAX {
146            let mut used = 0u64;
147            for (entry, _) in self.file_listing()? {
148                let path = entry.path();
149
150                let meta = match entry.metadata() {
151                    Err(_) => {
152                        info!(
153                            "could not get metadata for \"{:?}\", ignoring for size calculation",
154                            &path
155                        );
156                        continue;
157                    }
158                    Ok(meta) => meta,
159                };
160
161                used += meta.blocks() * 512;
162            }
163            if used > size_limit {
164                return Ok(false);
165            }
166        }
167
168        if let Some(min_avail_of_total) = self.cfg.min_avail_of_total {
169            let avail = fsstat.available_space - len as u64;
170            let avail_of_total = avail as f64 / fsstat.total_space as f64;
171            if avail_of_total < min_avail_of_total {
172                if self.cfg.warn_if_avail_reached {
173                    warn!("min_avail_of_total reached, you said this shouldn't happen");
174                }
175                return Ok(false);
176            }
177        }
178
179        Ok(true)
180    }
181
182    /// deletes one file.
183    /// returns Ok(true) if a file was deleted.
184    /// returns Ok(false) if there was no file to delete.
185    fn cleanup(&mut self) -> Result<bool> {
186        let mut entries: Vec<_> = self.file_listing()?.collect();
187
188        entries.sort_by(|(_, a), (_, b)| a.cmp(&b));
189
190        let (oldest_file, file_name) = match entries.get(0) {
191            Some(v) => v,
192            None => {
193                warn!("log-writer can not free space: no files to delete");
194                return Err(Error::from_raw_os_error(libc::ENOSPC));
195            }
196        };
197
198        if *file_name == self.current_name {
199            warn!("log-writer can not free space: oldest file is current file");
200            return Err(Error::from_raw_os_error(libc::ENOSPC));
201        }
202
203        fs::remove_file(oldest_file.path())?;
204        Ok(true)
205    }
206
207    fn next_file(&mut self) -> Result<()> {
208        let (next_name, next) = create_next_file(&self.cfg)?;
209        self.callbacks.clone().end_file(self)?;
210        self.current.flush()?;
211        self.current_name = next_name;
212        self.current_size = 0;
213        self.write_start = Instant::now();
214        self.current = next;
215        self.callbacks.clone().start_file(self)?;
216        Ok(())
217    }
218}
219
220impl<T: LogWriterCallbacks + Sized + Clone + Debug> Write for LogWriter<T> {
221    fn write(&mut self, buf: &[u8]) -> Result<usize> {
222        if self.current_size + buf.len() > self.cfg.max_file_size {
223            self.next_file()?;
224        }
225        if let Some(max_file_age) = self.cfg.max_file_age {
226            if Instant::now().duration_since(self.write_start).as_secs() > max_file_age {
227                self.next_file()?;
228            }
229        }
230
231        while !self.enough_space(buf.len())? {
232            if !self.cleanup()? {
233                warn!("could not free enough space, this might cause strange behaviour");
234                break;
235            }
236        }
237
238        let written = self.current.write(buf)?;
239        self.current_size += written;
240
241        Ok(written)
242    }
243
244    fn flush(&mut self) -> Result<()> {
245        self.current.flush()
246    }
247}