captains_log/
buf_file_impl.rs

1use crate::{
2    config::{LogFormat, SinkConfigTrait},
3    log_impl::{LogSink, LogSinkTrait},
4    rotation::*,
5    time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::Once;
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19/// Limit to 4k buf size, so that during reload or graceful restart,
20/// the line will not be break.
21pub const FLUSH_SIZE_DEFAULT: usize = 4096;
22
23/// Config for buffered file sink which merged I/O and delay flush.
24/// Optional log rotation can be configured.
25///
26/// Used when you don't have a SSD and the log is massive.
27///
28/// **When your program shutting down, should call flush to ensure the log is written to disk.**
29///
30/// ``` rust
31/// log::logger().flush();
32/// ```
33/// On panic, our panic hook will call `flush()` explicitly.
34///
35/// flush size default to be 4k to prevent line breaks on program (graceful) restart.
36///
37/// # Example
38///
39/// Source of [crate::recipe::buffered_file_logger_custom()]
40///
41/// ``` rust
42/// use captains_log::*;
43/// use std::path::{self, Path, PathBuf};
44///
45/// pub fn buffered_file_logger_custom<P: Into<PathBuf>>(
46///     file_path: P, max_level: Level, time_fmt: &'static str, format_func: FormatFunc,
47///     flush_millis: usize, rotate: Option<Rotation>,
48/// ) -> Builder {
49///     let format = LogFormat::new(time_fmt, format_func);
50///     let _file_path = file_path.into();
51///     let p = path::absolute(&_file_path).expect("path convert to absolute");
52///     let dir = p.parent().unwrap();
53///     let file_name = Path::new(p.file_name().unwrap());
54///     let mut file = LogBufFile::new(dir, file_name, max_level, format, flush_millis);
55///     if let Some(ro) = rotate {
56///         file = file.rotation(ro);
57///     }
58///     let mut config = Builder::default().signal(signal_hook::consts::SIGUSR1).buf_file(file);
59///     // panic on debugging
60///     #[cfg(debug_assertions)]
61///     {
62///         config.continue_when_panic = false;
63///     }
64///     // do not panic on release
65///     #[cfg(not(debug_assertions))]
66///     {
67///         config.continue_when_panic = true;
68///     }
69///     return config;
70/// }
71///```
72#[derive(Hash)]
73pub struct LogBufFile {
74    /// max log level in this file
75    pub level: Level,
76
77    pub format: LogFormat,
78
79    /// path: dir/name
80    pub file_path: Box<Path>,
81
82    /// default to 0, means always flush when no more message to write.
83    ///
84    /// when larger than zero, will wait for new message when timeout occur.
85    ///
86    /// Max value is 1000 (1 sec).
87    pub flush_millis: usize,
88
89    /// Rotation config
90    pub rotation: Option<Rotation>,
91
92    /// Auto flush when buffer size is reached, default to be 4k
93    pub flush_size: usize,
94}
95
96impl LogBufFile {
97    /// Construct config for file sink with buffer.
98    ///
99    /// Will try to create dir if not exists.
100    /// Periodic flush if flush_millis is zero, or
101    /// buffer size reaching 4096. will ensure a complete line write to the log file.
102    ///
103    /// # Arguments:
104    ///
105    /// The type of `dir` and `file_name` can be &str / String / &OsStr / OsString / Path / PathBuf. They can be of
106    /// different types.
107    ///
108    /// - `flush_millis`:
109    ///
110    ///    - default to 0, means always flush when no more message to write.
111    ///
112    ///    - when larger than zero, will wait for new message when timeout occur.
113    /// The max value is 1000 (1 sec).
114    pub fn new<P1, P2>(
115        dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
116    ) -> Self
117    where
118        P1: Into<PathBuf>,
119        P2: Into<PathBuf>,
120    {
121        let dir_path: PathBuf = dir.into();
122        if !dir_path.exists() {
123            std::fs::create_dir(&dir_path).expect("create dir for log");
124        }
125        let file_path = dir_path.join(file_name.into()).into_boxed_path();
126        Self {
127            level,
128            format,
129            file_path,
130            flush_millis,
131            rotation: None,
132            flush_size: FLUSH_SIZE_DEFAULT,
133        }
134    }
135
136    pub fn rotation(mut self, ro: Rotation) -> Self {
137        self.rotation = Some(ro);
138        self
139    }
140}
141
142impl SinkConfigTrait for LogBufFile {
143    fn get_level(&self) -> Level {
144        self.level
145    }
146
147    fn get_file_path(&self) -> Option<Box<Path>> {
148        Some(self.file_path.clone())
149    }
150
151    fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
152        self.hash(hasher);
153        hasher.write(b"LogBufFile");
154    }
155
156    fn build(&self) -> LogSink {
157        LogSink::BufFile(LogSinkBufFile::new(self))
158    }
159}
160
161pub(crate) struct LogSinkBufFile {
162    max_level: Level,
163    // raw fd only valid before original File close, use ArcSwap to prevent drop while using.
164    formatter: LogFormat,
165    _th: thread::JoinHandle<()>,
166    tx: MTx<Msg>,
167}
168
169impl LogSinkBufFile {
170    pub fn new(config: &LogBufFile) -> Self {
171        let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
172
173        let mut flush_millis = config.flush_millis;
174        if flush_millis == 0 || flush_millis > 1000 {
175            flush_millis = 1000;
176        }
177        let mut rotate_impl: Option<LogRotate> = None;
178        if let Some(r) = &config.rotation {
179            rotate_impl = Some(r.build(&config.file_path));
180        }
181        let mut flush_size = config.flush_size;
182        if flush_size == 0 {
183            flush_size = FLUSH_SIZE_DEFAULT;
184        }
185        let mut inner = BufFileInner {
186            size: 0,
187            create_time: None,
188            path: config.file_path.to_path_buf(),
189            f: None,
190            flush_millis,
191            flush_size,
192            buf: Vec::with_capacity(4096),
193            rotate: rotate_impl,
194        };
195        let _th = thread::spawn(move || inner.log_writer(rx));
196        Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
197    }
198}
199
200impl LogSinkTrait for LogSinkBufFile {
201    fn reopen(&self) -> std::io::Result<()> {
202        let _ = self.tx.send(Msg::Reopen);
203        Ok(())
204    }
205
206    #[inline(always)]
207    fn log(&self, now: &Timer, r: &Record) {
208        if r.level() <= self.max_level {
209            // Get a stable buffer,
210            // for concurrently write to file from multi process.
211            let buf = self.formatter.process(now, r);
212            let _ = self.tx.send(Msg::Line(buf));
213        }
214    }
215
216    #[inline(always)]
217    fn flush(&self) {
218        let _ = self.tx.send(Msg::Flush(Once::new()));
219    }
220}
221
222enum Msg {
223    Line(String),
224    Reopen,
225    Flush(Once),
226}
227
228struct BufFileInner {
229    size: u64,
230    create_time: Option<SystemTime>,
231    path: PathBuf,
232    f: Option<std::fs::File>,
233    buf: Vec<u8>,
234    flush_millis: usize,
235    rotate: Option<LogRotate>,
236    flush_size: usize,
237}
238
239impl FileSinkTrait for BufFileInner {
240    #[inline(always)]
241    fn get_create_time(&self) -> SystemTime {
242        self.create_time.unwrap()
243    }
244
245    #[inline(always)]
246    fn get_size(&self) -> u64 {
247        self.size
248    }
249}
250
251impl BufFileInner {
252    fn reopen(&mut self) {
253        match open_file(&self.path) {
254            Ok(f) => {
255                let mt = metadata(&self.path).expect("get metadata");
256                self.size = mt.len();
257                if self.create_time.is_none() {
258                    // NOTE Posix has no create_time, so use mtime. rotation will delay a cycle after program restart.
259                    self.create_time = Some(mt.modified().unwrap());
260                }
261                self.f.replace(f);
262            }
263            Err(e) => {
264                eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
265            }
266        }
267    }
268
269    fn write(&mut self, mut s: Vec<u8>) {
270        if self.buf.len() + s.len() > self.flush_size {
271            if self.buf.len() > 0 {
272                self.flush(false);
273            }
274        }
275        self.buf.reserve(s.len());
276        self.buf.append(&mut s);
277        if self.buf.len() >= self.flush_size {
278            self.flush(false);
279        }
280    }
281
282    #[inline(always)]
283    fn check_rotate(&mut self) {
284        if let Some(ro) = self.rotate.as_ref() {
285            if ro.rotate(self) {
286                self.reopen();
287            }
288        }
289    }
290
291    fn flush(&mut self, wait_rotate: bool) {
292        if let Some(f) = self.f.as_ref() {
293            self.size += self.buf.len() as u64;
294            // Use unbuffered I/O to ensure the write ok
295            let _ = unsafe {
296                libc::write(
297                    f.as_raw_fd() as libc::c_int,
298                    self.buf.as_ptr() as *const libc::c_void,
299                    self.buf.len(),
300                )
301            };
302            unsafe { self.buf.set_len(0) };
303            self.check_rotate();
304        }
305        if wait_rotate {
306            if let Some(ro) = self.rotate.as_ref() {
307                ro.wait();
308            }
309        }
310    }
311
312    fn log_writer(&mut self, rx: Rx<Msg>) {
313        self.reopen();
314        self.check_rotate();
315
316        macro_rules! process {
317            ($msg: expr) => {
318                match $msg {
319                    Msg::Line(line) => {
320                        self.write(line.into());
321                    }
322                    Msg::Reopen => {
323                        self.reopen();
324                    }
325                    Msg::Flush(o) => {
326                        self.flush(true);
327                        o.call_once(|| {});
328                    }
329                }
330            };
331        }
332        if self.flush_millis > 0 {
333            loop {
334                match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
335                    Ok(msg) => {
336                        process!(msg);
337                        while let Ok(msg) = rx.try_recv() {
338                            process!(msg);
339                        }
340                    }
341                    Err(RecvTimeoutError::Timeout) => {
342                        self.flush(false);
343                    }
344                    Err(RecvTimeoutError::Disconnected) => {
345                        self.flush(true);
346                        return;
347                    }
348                }
349            }
350        } else {
351            loop {
352                match rx.recv() {
353                    Ok(msg) => {
354                        process!(msg);
355                        while let Ok(msg) = rx.try_recv() {
356                            process!(msg);
357                        }
358                        self.flush(false);
359                    }
360                    Err(_) => {
361                        self.flush(true);
362                        return;
363                    }
364                }
365            }
366        }
367    }
368}