captains_log/
buf_file_impl.rs

1use crate::{
2    config::{LogFormat, SinkConfigBuild, SinkConfigTrait},
3    log_impl::{LogSink, LogSinkTrait},
4    rotation::*,
5    time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Once};
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19/// Limit to 4k buf size, so that during reload or graceful restart,
20/// the line will not be break.
21const FLUSH_SIZE_DEFAULT: usize = 4096;
22
23/// Config for buffered file sink which merged I/O and delay flush.
24/// Optional log rotation can be configured.
25///
26/// Used when you don't have a SSD and the log is massive.
27///
28/// **When your program shutting down, should call flush to ensure the log is written to disk.**
29///
30/// ``` rust
31/// log::logger().flush();
32/// ```
33/// On panic, our panic hook will call `flush()` explicitly.
34///
35/// flush size default to be 4k to prevent line breaks on program (graceful) restart.
36///
37/// # Example
38///
39/// Source of [crate::recipe::buffered_file_logger_custom()]
40///
41/// ``` rust
42/// use captains_log::{*, rotation::*};
43/// use std::path::{self, Path, PathBuf};
44///
45/// pub fn buffered_file_logger_custom<P: Into<PathBuf>>(
46///     file_path: P, max_level: Level, time_fmt: &'static str, format_func: FormatFunc,
47///     flush_millis: usize, rotate: Option<Rotation>,
48/// ) -> Builder {
49///     let format = LogFormat::new(time_fmt, format_func);
50///     let _file_path = file_path.into();
51///     let p = path::absolute(&_file_path).expect("path convert to absolute");
52///     let dir = p.parent().unwrap();
53///     let file_name = Path::new(p.file_name().unwrap());
54///     let mut file = LogBufFile::new(dir, file_name, max_level, format, flush_millis);
55///     if let Some(ro) = rotate {
56///         file = file.rotation(ro);
57///     }
58///     return Builder::default().signal(signal_hook::consts::SIGUSR1).add_sink(file);
59/// }
60///```
61#[derive(Hash)]
62pub struct LogBufFile {
63    /// max log level in this file
64    pub level: Level,
65
66    pub format: LogFormat,
67
68    /// path: dir/name
69    pub file_path: Box<Path>,
70
71    /// default to 0, means always flush when no more message to write.
72    ///
73    /// when larger than zero, will wait for new message when timeout occur.
74    ///
75    /// Max value is 1000 (1 sec).
76    pub flush_millis: usize,
77
78    /// Rotation config
79    pub rotation: Option<Rotation>,
80
81    /// Auto flush when buffer size is reached, **default to be 4KB**,
82    /// so that during reload or graceful restart, the line will not be break.
83    pub flush_size: usize,
84}
85
86impl LogBufFile {
87    /// Construct config for file sink with buffer.
88    ///
89    /// Will try to create dir if not exists.
90    /// Periodic flush if flush_millis is zero, or
91    /// buffer size reaching 4096. will ensure a complete line write to the log file.
92    ///
93    /// # Arguments:
94    ///
95    /// The type of `dir` and `file_name` can be &str / String / &OsStr / OsString / Path / PathBuf. They can be of
96    /// different types.
97    ///
98    /// - `flush_millis`:
99    ///
100    ///    - default to 0, means always flush when no more message to write.
101    ///
102    ///    - when larger than zero, will wait for new message when timeout occur.
103    /// The max value is 1000 (1 sec).
104    pub fn new<P1, P2>(
105        dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
106    ) -> Self
107    where
108        P1: Into<PathBuf>,
109        P2: Into<PathBuf>,
110    {
111        let dir_path: PathBuf = dir.into();
112        if !dir_path.exists() {
113            std::fs::create_dir(&dir_path).expect("create dir for log");
114        }
115        let file_path = dir_path.join(file_name.into()).into_boxed_path();
116        Self {
117            level,
118            format,
119            file_path,
120            flush_millis,
121            rotation: None,
122            flush_size: FLUSH_SIZE_DEFAULT,
123        }
124    }
125
126    pub fn rotation(mut self, ro: Rotation) -> Self {
127        self.rotation = Some(ro);
128        self
129    }
130}
131
132impl SinkConfigBuild for LogBufFile {
133    fn build(&self) -> LogSink {
134        LogSink::BufFile(LogSinkBufFile::new(self))
135    }
136}
137
138impl SinkConfigTrait for LogBufFile {
139    fn get_level(&self) -> Level {
140        self.level
141    }
142
143    fn get_file_path(&self) -> Option<Box<Path>> {
144        Some(self.file_path.clone())
145    }
146
147    fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
148        self.hash(hasher);
149        hasher.write(b"LogBufFile");
150    }
151}
152
153pub(crate) struct LogSinkBufFile {
154    max_level: Level,
155    // raw fd only valid before original File close, use ArcSwap to prevent drop while using.
156    formatter: LogFormat,
157    _th: thread::JoinHandle<()>,
158    tx: MTx<Msg>,
159}
160
161impl LogSinkBufFile {
162    fn new(config: &LogBufFile) -> Self {
163        let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
164
165        let mut flush_millis = config.flush_millis;
166        if flush_millis == 0 || flush_millis > 1000 {
167            flush_millis = 1000;
168        }
169        let mut rotate_impl: Option<LogRotate> = None;
170        if let Some(r) = &config.rotation {
171            rotate_impl = Some(r.build(&config.file_path));
172        }
173        let mut flush_size = config.flush_size;
174        if flush_size == 0 {
175            flush_size = FLUSH_SIZE_DEFAULT;
176        }
177        let mut inner = BufFileInner {
178            size: 0,
179            create_time: None,
180            path: config.file_path.to_path_buf(),
181            f: None,
182            flush_millis,
183            flush_size,
184            buf: Vec::with_capacity(4096),
185            rotate: rotate_impl,
186        };
187        let _th = thread::spawn(move || inner.log_writer(rx));
188        Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
189    }
190}
191
192impl LogSinkTrait for LogSinkBufFile {
193    #[inline]
194    fn open(&self) -> std::io::Result<()> {
195        self.reopen()
196    }
197    fn reopen(&self) -> std::io::Result<()> {
198        let _ = self.tx.send(Msg::Reopen);
199        Ok(())
200    }
201
202    #[inline(always)]
203    fn log(&self, now: &Timer, r: &Record) {
204        if r.level() <= self.max_level {
205            // Get a stable buffer,
206            // for concurrently write to file from multi process.
207            let buf = self.formatter.process(now, r);
208            let _ = self.tx.send(Msg::Line(buf));
209        }
210    }
211
212    #[inline(always)]
213    fn flush(&self) {
214        let o = Arc::new(Once::new());
215        if self.tx.send(Msg::Flush(o.clone())).is_ok() {
216            o.wait();
217        }
218    }
219}
220
221enum Msg {
222    Line(String),
223    Reopen,
224    Flush(Arc<Once>),
225}
226
227struct BufFileInner {
228    size: u64,
229    create_time: Option<SystemTime>,
230    path: PathBuf,
231    f: Option<std::fs::File>,
232    buf: Vec<u8>,
233    flush_millis: usize,
234    rotate: Option<LogRotate>,
235    flush_size: usize,
236}
237
238impl FileSinkTrait for BufFileInner {
239    #[inline(always)]
240    fn get_create_time(&self) -> SystemTime {
241        self.create_time.unwrap()
242    }
243
244    #[inline(always)]
245    fn get_size(&self) -> u64 {
246        self.size
247    }
248}
249
250impl BufFileInner {
251    fn reopen(&mut self) {
252        match open_file(&self.path) {
253            Ok(f) => {
254                let mt = metadata(&self.path).expect("get metadata");
255                self.size = mt.len();
256                if self.create_time.is_none() {
257                    // NOTE Posix has no create_time, so use mtime. rotation will delay a cycle after program restart.
258                    self.create_time = Some(mt.modified().unwrap());
259                }
260                self.f.replace(f);
261            }
262            Err(e) => {
263                eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
264            }
265        }
266    }
267
268    fn write(&mut self, mut s: Vec<u8>) {
269        if self.buf.len() + s.len() > self.flush_size {
270            if self.buf.len() > 0 {
271                self.flush(false);
272            }
273        }
274        self.buf.reserve(s.len());
275        self.buf.append(&mut s);
276        if self.buf.len() >= self.flush_size {
277            self.flush(false);
278        }
279    }
280
281    #[inline(always)]
282    fn check_rotate(&mut self) {
283        if let Some(ro) = self.rotate.as_ref() {
284            if ro.rotate(self) {
285                self.reopen();
286            }
287        }
288    }
289
290    fn flush(&mut self, wait_rotate: bool) {
291        if let Some(f) = self.f.as_ref() {
292            self.size += self.buf.len() as u64;
293            // Use unbuffered I/O to ensure the write ok
294            let mut p = self.buf.as_ptr() as *const u8;
295            let mut l = self.buf.len();
296            loop {
297                let r = unsafe {
298                    libc::write(f.as_raw_fd() as libc::c_int, p as *const libc::c_void, l)
299                };
300                if r == l as isize || r < 0 {
301                    // Ignore write error (disk err, space err), should not affect the program
302                    break;
303                }
304                // NOTE: If early return happens, means you are using a filesystem not
305                // supporting atomic append
306                l -= r as usize;
307                p = unsafe { p.add(r as usize) };
308            }
309            unsafe { self.buf.set_len(0) };
310            self.check_rotate();
311        }
312        if wait_rotate {
313            if let Some(ro) = self.rotate.as_ref() {
314                ro.wait();
315            }
316        }
317    }
318
319    fn log_writer(&mut self, rx: Rx<Msg>) {
320        self.reopen();
321        self.check_rotate();
322
323        macro_rules! process {
324            ($msg: expr) => {
325                match $msg {
326                    Msg::Line(line) => {
327                        self.write(line.into());
328                    }
329                    Msg::Reopen => {
330                        self.reopen();
331                    }
332                    Msg::Flush(o) => {
333                        self.flush(true);
334                        o.call_once(|| {});
335                    }
336                }
337            };
338        }
339        if self.flush_millis > 0 {
340            loop {
341                match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
342                    Ok(msg) => {
343                        process!(msg);
344                        while let Ok(msg) = rx.try_recv() {
345                            process!(msg);
346                        }
347                    }
348                    Err(RecvTimeoutError::Timeout) => {
349                        self.flush(false);
350                    }
351                    Err(RecvTimeoutError::Disconnected) => {
352                        self.flush(true);
353                        return;
354                    }
355                }
356            }
357        } else {
358            loop {
359                match rx.recv() {
360                    Ok(msg) => {
361                        process!(msg);
362                        while let Ok(msg) = rx.try_recv() {
363                            process!(msg);
364                        }
365                        self.flush(false);
366                    }
367                    Err(_) => {
368                        self.flush(true);
369                        return;
370                    }
371                }
372            }
373        }
374    }
375}