captains_log/
buf_file_impl.rs

1use crate::{
2    config::{LogFormat, SinkConfigTrait},
3    log_impl::{LogSink, LogSinkTrait},
4    rotation::*,
5    time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Once};
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19/// Limit to 4k buf size, so that during reload or graceful restart,
20/// the line will not be break.
21pub const FLUSH_SIZE_DEFAULT: usize = 4096;
22
23/// Config for buffered file sink which merged I/O and delay flush.
24/// Optional log rotation can be configured.
25///
26/// Used when you don't have a SSD and the log is massive.
27///
28/// **When your program shutting down, should call flush to ensure the log is written to disk.**
29///
30/// ``` rust
31/// log::logger().flush();
32/// ```
33/// On panic, our panic hook will call `flush()` explicitly.
34///
35/// flush size default to be 4k to prevent line breaks on program (graceful) restart.
36///
37/// # Example
38///
39/// Source of [crate::recipe::buffered_file_logger_custom()]
40///
41/// ``` rust
42/// use captains_log::*;
43/// use std::path::{self, Path, PathBuf};
44///
45/// pub fn buffered_file_logger_custom<P: Into<PathBuf>>(
46///     file_path: P, max_level: Level, time_fmt: &'static str, format_func: FormatFunc,
47///     flush_millis: usize, rotate: Option<Rotation>,
48/// ) -> Builder {
49///     let format = LogFormat::new(time_fmt, format_func);
50///     let _file_path = file_path.into();
51///     let p = path::absolute(&_file_path).expect("path convert to absolute");
52///     let dir = p.parent().unwrap();
53///     let file_name = Path::new(p.file_name().unwrap());
54///     let mut file = LogBufFile::new(dir, file_name, max_level, format, flush_millis);
55///     if let Some(ro) = rotate {
56///         file = file.rotation(ro);
57///     }
58///     return Builder::default().signal(signal_hook::consts::SIGUSR1).buf_file(file);
59/// }
60///```
61#[derive(Hash)]
62pub struct LogBufFile {
63    /// max log level in this file
64    pub level: Level,
65
66    pub format: LogFormat,
67
68    /// path: dir/name
69    pub file_path: Box<Path>,
70
71    /// default to 0, means always flush when no more message to write.
72    ///
73    /// when larger than zero, will wait for new message when timeout occur.
74    ///
75    /// Max value is 1000 (1 sec).
76    pub flush_millis: usize,
77
78    /// Rotation config
79    pub rotation: Option<Rotation>,
80
81    /// Auto flush when buffer size is reached, default to be 4k
82    pub flush_size: usize,
83}
84
85impl LogBufFile {
86    /// Construct config for file sink with buffer.
87    ///
88    /// Will try to create dir if not exists.
89    /// Periodic flush if flush_millis is zero, or
90    /// buffer size reaching 4096. will ensure a complete line write to the log file.
91    ///
92    /// # Arguments:
93    ///
94    /// The type of `dir` and `file_name` can be &str / String / &OsStr / OsString / Path / PathBuf. They can be of
95    /// different types.
96    ///
97    /// - `flush_millis`:
98    ///
99    ///    - default to 0, means always flush when no more message to write.
100    ///
101    ///    - when larger than zero, will wait for new message when timeout occur.
102    /// The max value is 1000 (1 sec).
103    pub fn new<P1, P2>(
104        dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
105    ) -> Self
106    where
107        P1: Into<PathBuf>,
108        P2: Into<PathBuf>,
109    {
110        let dir_path: PathBuf = dir.into();
111        if !dir_path.exists() {
112            std::fs::create_dir(&dir_path).expect("create dir for log");
113        }
114        let file_path = dir_path.join(file_name.into()).into_boxed_path();
115        Self {
116            level,
117            format,
118            file_path,
119            flush_millis,
120            rotation: None,
121            flush_size: FLUSH_SIZE_DEFAULT,
122        }
123    }
124
125    pub fn rotation(mut self, ro: Rotation) -> Self {
126        self.rotation = Some(ro);
127        self
128    }
129}
130
131impl SinkConfigTrait for LogBufFile {
132    fn get_level(&self) -> Level {
133        self.level
134    }
135
136    fn get_file_path(&self) -> Option<Box<Path>> {
137        Some(self.file_path.clone())
138    }
139
140    fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
141        self.hash(hasher);
142        hasher.write(b"LogBufFile");
143    }
144
145    fn build(&self) -> LogSink {
146        LogSink::BufFile(LogSinkBufFile::new(self))
147    }
148}
149
150pub(crate) struct LogSinkBufFile {
151    max_level: Level,
152    // raw fd only valid before original File close, use ArcSwap to prevent drop while using.
153    formatter: LogFormat,
154    _th: thread::JoinHandle<()>,
155    tx: MTx<Msg>,
156}
157
158impl LogSinkBufFile {
159    fn new(config: &LogBufFile) -> Self {
160        let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
161
162        let mut flush_millis = config.flush_millis;
163        if flush_millis == 0 || flush_millis > 1000 {
164            flush_millis = 1000;
165        }
166        let mut rotate_impl: Option<LogRotate> = None;
167        if let Some(r) = &config.rotation {
168            rotate_impl = Some(r.build(&config.file_path));
169        }
170        let mut flush_size = config.flush_size;
171        if flush_size == 0 {
172            flush_size = FLUSH_SIZE_DEFAULT;
173        }
174        let mut inner = BufFileInner {
175            size: 0,
176            create_time: None,
177            path: config.file_path.to_path_buf(),
178            f: None,
179            flush_millis,
180            flush_size,
181            buf: Vec::with_capacity(4096),
182            rotate: rotate_impl,
183        };
184        let _th = thread::spawn(move || inner.log_writer(rx));
185        Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
186    }
187}
188
189impl LogSinkTrait for LogSinkBufFile {
190    fn reopen(&self) -> std::io::Result<()> {
191        let _ = self.tx.send(Msg::Reopen);
192        Ok(())
193    }
194
195    #[inline(always)]
196    fn log(&self, now: &Timer, r: &Record) {
197        if r.level() <= self.max_level {
198            // Get a stable buffer,
199            // for concurrently write to file from multi process.
200            let buf = self.formatter.process(now, r);
201            let _ = self.tx.send(Msg::Line(buf));
202        }
203    }
204
205    #[inline(always)]
206    fn flush(&self) {
207        let o = Arc::new(Once::new());
208        if self.tx.send(Msg::Flush(o.clone())).is_ok() {
209            o.wait();
210        }
211    }
212}
213
214enum Msg {
215    Line(String),
216    Reopen,
217    Flush(Arc<Once>),
218}
219
220struct BufFileInner {
221    size: u64,
222    create_time: Option<SystemTime>,
223    path: PathBuf,
224    f: Option<std::fs::File>,
225    buf: Vec<u8>,
226    flush_millis: usize,
227    rotate: Option<LogRotate>,
228    flush_size: usize,
229}
230
231impl FileSinkTrait for BufFileInner {
232    #[inline(always)]
233    fn get_create_time(&self) -> SystemTime {
234        self.create_time.unwrap()
235    }
236
237    #[inline(always)]
238    fn get_size(&self) -> u64 {
239        self.size
240    }
241}
242
243impl BufFileInner {
244    fn reopen(&mut self) {
245        match open_file(&self.path) {
246            Ok(f) => {
247                let mt = metadata(&self.path).expect("get metadata");
248                self.size = mt.len();
249                if self.create_time.is_none() {
250                    // NOTE Posix has no create_time, so use mtime. rotation will delay a cycle after program restart.
251                    self.create_time = Some(mt.modified().unwrap());
252                }
253                self.f.replace(f);
254            }
255            Err(e) => {
256                eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
257            }
258        }
259    }
260
261    fn write(&mut self, mut s: Vec<u8>) {
262        if self.buf.len() + s.len() > self.flush_size {
263            if self.buf.len() > 0 {
264                self.flush(false);
265            }
266        }
267        self.buf.reserve(s.len());
268        self.buf.append(&mut s);
269        if self.buf.len() >= self.flush_size {
270            self.flush(false);
271        }
272    }
273
274    #[inline(always)]
275    fn check_rotate(&mut self) {
276        if let Some(ro) = self.rotate.as_ref() {
277            if ro.rotate(self) {
278                self.reopen();
279            }
280        }
281    }
282
283    fn flush(&mut self, wait_rotate: bool) {
284        if let Some(f) = self.f.as_ref() {
285            self.size += self.buf.len() as u64;
286            // Use unbuffered I/O to ensure the write ok
287            let _ = unsafe {
288                libc::write(
289                    f.as_raw_fd() as libc::c_int,
290                    self.buf.as_ptr() as *const libc::c_void,
291                    self.buf.len(),
292                )
293            };
294            unsafe { self.buf.set_len(0) };
295            self.check_rotate();
296        }
297        if wait_rotate {
298            if let Some(ro) = self.rotate.as_ref() {
299                ro.wait();
300            }
301        }
302    }
303
304    fn log_writer(&mut self, rx: Rx<Msg>) {
305        self.reopen();
306        self.check_rotate();
307
308        macro_rules! process {
309            ($msg: expr) => {
310                match $msg {
311                    Msg::Line(line) => {
312                        self.write(line.into());
313                    }
314                    Msg::Reopen => {
315                        self.reopen();
316                    }
317                    Msg::Flush(o) => {
318                        self.flush(true);
319                        o.call_once(|| {});
320                    }
321                }
322            };
323        }
324        if self.flush_millis > 0 {
325            loop {
326                match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
327                    Ok(msg) => {
328                        process!(msg);
329                        while let Ok(msg) = rx.try_recv() {
330                            process!(msg);
331                        }
332                    }
333                    Err(RecvTimeoutError::Timeout) => {
334                        self.flush(false);
335                    }
336                    Err(RecvTimeoutError::Disconnected) => {
337                        self.flush(true);
338                        return;
339                    }
340                }
341            }
342        } else {
343            loop {
344                match rx.recv() {
345                    Ok(msg) => {
346                        process!(msg);
347                        while let Ok(msg) = rx.try_recv() {
348                            process!(msg);
349                        }
350                        self.flush(false);
351                    }
352                    Err(_) => {
353                        self.flush(true);
354                        return;
355                    }
356                }
357            }
358        }
359    }
360}