captains_log/
buf_file_impl.rs

1use crate::{
2    config::{LogFormat, SinkConfigTrait},
3    log_impl::{LogSink, LogSinkTrait},
4    rotation::*,
5    time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::Once;
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19/// Limit to 4k buf size, so that during reload or graceful restart,
20/// the line will not be break.
21const FLUSH_SIZE: usize = 4096;
22
23/// Config for buffered file sink which merged I/O and delay flush
24#[derive(Hash)]
25pub struct LogBufFile {
26    /// max log level in this file
27    pub level: Level,
28
29    pub format: LogFormat,
30
31    /// path: dir/name
32    pub file_path: Box<Path>,
33
34    /// default to 0, means always flush when no more message to write.
35    ///
36    /// when larger than zero, will wait for new message when timeout occur.
37    pub flush_millis: usize,
38
39    /// Rotation config
40    pub rotation: Option<Rotation>,
41}
42
43impl LogBufFile {
44    /// Construct config for file sink with buffer.
45    ///
46    /// Will try to create dir if not exists.
47    /// Periodic flush if flush_millis is zero, or
48    /// buffer size reaching 4096. will ensure a complete line write to the log file.
49    ///
50    /// # Arguments:
51    ///
52    /// The type of `dir` and `file_name` can be &str / String / &OsStr / OsString / Path / PathBuf. They can be of
53    /// different types.
54    ///
55    /// - `flush_millis`:
56    ///
57    ///    - default to 0, means always flush when no more message to write.
58    ///
59    ///    - when larger than zero, will wait for new message when timeout occur.
60    /// The max value is 1000 (1 sec).
61    pub fn new<P1, P2>(
62        dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
63    ) -> Self
64    where
65        P1: Into<PathBuf>,
66        P2: Into<PathBuf>,
67    {
68        let dir_path: PathBuf = dir.into();
69        if !dir_path.exists() {
70            std::fs::create_dir(&dir_path).expect("create dir for log");
71        }
72        let file_path = dir_path.join(file_name.into()).into_boxed_path();
73        Self { level, format, file_path, flush_millis, rotation: None }
74    }
75
76    pub fn rotation(mut self, ro: Rotation) -> Self {
77        self.rotation = Some(ro);
78        self
79    }
80}
81
82impl SinkConfigTrait for LogBufFile {
83    fn get_level(&self) -> Level {
84        self.level
85    }
86
87    fn get_file_path(&self) -> Option<Box<Path>> {
88        Some(self.file_path.clone())
89    }
90
91    fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
92        self.hash(hasher);
93        hasher.write(b"LogRawFile");
94    }
95
96    fn build(&self) -> LogSink {
97        LogSink::BufFile(LogSinkBufFile::new(self))
98    }
99}
100
101pub(crate) struct LogSinkBufFile {
102    max_level: Level,
103    // raw fd only valid before original File close, use ArcSwap to prevent drop while using.
104    formatter: LogFormat,
105    _th: thread::JoinHandle<()>,
106    tx: MTx<Msg>,
107}
108
109impl LogSinkBufFile {
110    pub fn new(config: &LogBufFile) -> Self {
111        let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
112
113        let mut flush_millis = config.flush_millis;
114        if flush_millis == 0 || flush_millis > 1000 {
115            flush_millis = 1000;
116        }
117        let mut rotate_impl: Option<LogRotate> = None;
118        if let Some(r) = &config.rotation {
119            rotate_impl = Some(r.build(&config.file_path));
120        }
121        let mut inner = BufFileInner {
122            size: 0,
123            create_time: None,
124            path: config.file_path.to_path_buf(),
125            f: None,
126            flush_millis,
127            buf: Vec::with_capacity(4096),
128            rotate: rotate_impl,
129        };
130        let _th = thread::spawn(move || inner.log_writer(rx));
131        Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
132    }
133}
134
135impl LogSinkTrait for LogSinkBufFile {
136    fn reopen(&self) -> std::io::Result<()> {
137        let _ = self.tx.send(Msg::Reopen);
138        Ok(())
139    }
140
141    #[inline(always)]
142    fn log(&self, now: &Timer, r: &Record) {
143        if r.level() <= self.max_level {
144            // Get a stable buffer,
145            // for concurrently write to file from multi process.
146            let buf = self.formatter.process(now, r);
147            let _ = self.tx.send(Msg::Line(buf));
148        }
149    }
150
151    #[inline(always)]
152    fn flush(&self) {
153        let _ = self.tx.send(Msg::Flush(Once::new()));
154    }
155}
156
157enum Msg {
158    Line(String),
159    Reopen,
160    Flush(Once),
161}
162
163struct BufFileInner {
164    size: u64,
165    create_time: Option<SystemTime>,
166    path: PathBuf,
167    f: Option<std::fs::File>,
168    buf: Vec<u8>,
169    flush_millis: usize,
170    rotate: Option<LogRotate>,
171}
172
173impl FileSinkTrait for BufFileInner {
174    #[inline(always)]
175    fn get_create_time(&self) -> SystemTime {
176        self.create_time.unwrap()
177    }
178
179    #[inline(always)]
180    fn get_size(&self) -> u64 {
181        self.size
182    }
183}
184
185impl BufFileInner {
186    fn reopen(&mut self) {
187        match open_file(&self.path) {
188            Ok(f) => {
189                let mt = metadata(&self.path).expect("get metadata");
190                self.size = mt.len();
191                if self.create_time.is_none() {
192                    // NOTE Posix has no create_time, so use mtime. rotation will delay a cycle after program restart.
193                    self.create_time = Some(mt.modified().unwrap());
194                }
195                self.f.replace(f);
196            }
197            Err(e) => {
198                eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
199            }
200        }
201    }
202
203    fn write(&mut self, mut s: Vec<u8>) {
204        if self.buf.len() + s.len() > FLUSH_SIZE {
205            if self.buf.len() > 0 {
206                self.flush(false);
207            }
208        }
209        self.buf.reserve(s.len());
210        self.buf.append(&mut s);
211        if self.buf.len() >= FLUSH_SIZE {
212            self.flush(false);
213        }
214    }
215
216    #[inline(always)]
217    fn check_rotate(&mut self) {
218        if let Some(ro) = self.rotate.as_ref() {
219            if ro.rotate(self) {
220                self.reopen();
221            }
222        }
223    }
224
225    fn flush(&mut self, wait_rotate: bool) {
226        if let Some(f) = self.f.as_ref() {
227            self.size += self.buf.len() as u64;
228            // Use unbuffered I/O to ensure the write ok
229            let _ = unsafe {
230                libc::write(
231                    f.as_raw_fd() as libc::c_int,
232                    self.buf.as_ptr() as *const libc::c_void,
233                    self.buf.len(),
234                )
235            };
236            unsafe { self.buf.set_len(0) };
237            self.check_rotate();
238        }
239        if wait_rotate {
240            if let Some(ro) = self.rotate.as_ref() {
241                ro.wait();
242            }
243        }
244    }
245
246    fn log_writer(&mut self, rx: Rx<Msg>) {
247        self.reopen();
248        self.check_rotate();
249
250        macro_rules! process {
251            ($msg: expr) => {
252                match $msg {
253                    Msg::Line(line) => {
254                        self.write(line.into());
255                    }
256                    Msg::Reopen => {
257                        self.reopen();
258                    }
259                    Msg::Flush(o) => {
260                        self.flush(true);
261                        o.call_once(|| {});
262                    }
263                }
264            };
265        }
266        if self.flush_millis > 0 {
267            loop {
268                match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
269                    Ok(msg) => {
270                        process!(msg);
271                        while let Ok(msg) = rx.try_recv() {
272                            process!(msg);
273                        }
274                    }
275                    Err(RecvTimeoutError::Timeout) => {
276                        self.flush(false);
277                    }
278                    Err(RecvTimeoutError::Disconnected) => {
279                        self.flush(true);
280                        return;
281                    }
282                }
283            }
284        } else {
285            loop {
286                match rx.recv() {
287                    Ok(msg) => {
288                        process!(msg);
289                        while let Ok(msg) = rx.try_recv() {
290                            process!(msg);
291                        }
292                        self.flush(false);
293                    }
294                    Err(_) => {
295                        self.flush(true);
296                        return;
297                    }
298                }
299            }
300        }
301    }
302}