captains_log/
buf_file_impl.rs1use crate::{
2 config::{LogFormat, SinkConfigTrait},
3 log_impl::{LogSink, LogSinkTrait},
4 rotation::*,
5 time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::Once;
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19pub const FLUSH_SIZE_DEFAULT: usize = 4096;
22
23#[derive(Hash)]
73pub struct LogBufFile {
74 pub level: Level,
76
77 pub format: LogFormat,
78
79 pub file_path: Box<Path>,
81
82 pub flush_millis: usize,
88
89 pub rotation: Option<Rotation>,
91
92 pub flush_size: usize,
94}
95
96impl LogBufFile {
97 pub fn new<P1, P2>(
115 dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
116 ) -> Self
117 where
118 P1: Into<PathBuf>,
119 P2: Into<PathBuf>,
120 {
121 let dir_path: PathBuf = dir.into();
122 if !dir_path.exists() {
123 std::fs::create_dir(&dir_path).expect("create dir for log");
124 }
125 let file_path = dir_path.join(file_name.into()).into_boxed_path();
126 Self {
127 level,
128 format,
129 file_path,
130 flush_millis,
131 rotation: None,
132 flush_size: FLUSH_SIZE_DEFAULT,
133 }
134 }
135
136 pub fn rotation(mut self, ro: Rotation) -> Self {
137 self.rotation = Some(ro);
138 self
139 }
140}
141
142impl SinkConfigTrait for LogBufFile {
143 fn get_level(&self) -> Level {
144 self.level
145 }
146
147 fn get_file_path(&self) -> Option<Box<Path>> {
148 Some(self.file_path.clone())
149 }
150
151 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
152 self.hash(hasher);
153 hasher.write(b"LogBufFile");
154 }
155
156 fn build(&self) -> LogSink {
157 LogSink::BufFile(LogSinkBufFile::new(self))
158 }
159}
160
161pub(crate) struct LogSinkBufFile {
162 max_level: Level,
163 formatter: LogFormat,
165 _th: thread::JoinHandle<()>,
166 tx: MTx<Msg>,
167}
168
169impl LogSinkBufFile {
170 pub fn new(config: &LogBufFile) -> Self {
171 let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
172
173 let mut flush_millis = config.flush_millis;
174 if flush_millis == 0 || flush_millis > 1000 {
175 flush_millis = 1000;
176 }
177 let mut rotate_impl: Option<LogRotate> = None;
178 if let Some(r) = &config.rotation {
179 rotate_impl = Some(r.build(&config.file_path));
180 }
181 let mut flush_size = config.flush_size;
182 if flush_size == 0 {
183 flush_size = FLUSH_SIZE_DEFAULT;
184 }
185 let mut inner = BufFileInner {
186 size: 0,
187 create_time: None,
188 path: config.file_path.to_path_buf(),
189 f: None,
190 flush_millis,
191 flush_size,
192 buf: Vec::with_capacity(4096),
193 rotate: rotate_impl,
194 };
195 let _th = thread::spawn(move || inner.log_writer(rx));
196 Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
197 }
198}
199
200impl LogSinkTrait for LogSinkBufFile {
201 fn reopen(&self) -> std::io::Result<()> {
202 let _ = self.tx.send(Msg::Reopen);
203 Ok(())
204 }
205
206 #[inline(always)]
207 fn log(&self, now: &Timer, r: &Record) {
208 if r.level() <= self.max_level {
209 let buf = self.formatter.process(now, r);
212 let _ = self.tx.send(Msg::Line(buf));
213 }
214 }
215
216 #[inline(always)]
217 fn flush(&self) {
218 let _ = self.tx.send(Msg::Flush(Once::new()));
219 }
220}
221
222enum Msg {
223 Line(String),
224 Reopen,
225 Flush(Once),
226}
227
228struct BufFileInner {
229 size: u64,
230 create_time: Option<SystemTime>,
231 path: PathBuf,
232 f: Option<std::fs::File>,
233 buf: Vec<u8>,
234 flush_millis: usize,
235 rotate: Option<LogRotate>,
236 flush_size: usize,
237}
238
239impl FileSinkTrait for BufFileInner {
240 #[inline(always)]
241 fn get_create_time(&self) -> SystemTime {
242 self.create_time.unwrap()
243 }
244
245 #[inline(always)]
246 fn get_size(&self) -> u64 {
247 self.size
248 }
249}
250
251impl BufFileInner {
252 fn reopen(&mut self) {
253 match open_file(&self.path) {
254 Ok(f) => {
255 let mt = metadata(&self.path).expect("get metadata");
256 self.size = mt.len();
257 if self.create_time.is_none() {
258 self.create_time = Some(mt.modified().unwrap());
260 }
261 self.f.replace(f);
262 }
263 Err(e) => {
264 eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
265 }
266 }
267 }
268
269 fn write(&mut self, mut s: Vec<u8>) {
270 if self.buf.len() + s.len() > self.flush_size {
271 if self.buf.len() > 0 {
272 self.flush(false);
273 }
274 }
275 self.buf.reserve(s.len());
276 self.buf.append(&mut s);
277 if self.buf.len() >= self.flush_size {
278 self.flush(false);
279 }
280 }
281
282 #[inline(always)]
283 fn check_rotate(&mut self) {
284 if let Some(ro) = self.rotate.as_ref() {
285 if ro.rotate(self) {
286 self.reopen();
287 }
288 }
289 }
290
291 fn flush(&mut self, wait_rotate: bool) {
292 if let Some(f) = self.f.as_ref() {
293 self.size += self.buf.len() as u64;
294 let _ = unsafe {
296 libc::write(
297 f.as_raw_fd() as libc::c_int,
298 self.buf.as_ptr() as *const libc::c_void,
299 self.buf.len(),
300 )
301 };
302 unsafe { self.buf.set_len(0) };
303 self.check_rotate();
304 }
305 if wait_rotate {
306 if let Some(ro) = self.rotate.as_ref() {
307 ro.wait();
308 }
309 }
310 }
311
312 fn log_writer(&mut self, rx: Rx<Msg>) {
313 self.reopen();
314 self.check_rotate();
315
316 macro_rules! process {
317 ($msg: expr) => {
318 match $msg {
319 Msg::Line(line) => {
320 self.write(line.into());
321 }
322 Msg::Reopen => {
323 self.reopen();
324 }
325 Msg::Flush(o) => {
326 self.flush(true);
327 o.call_once(|| {});
328 }
329 }
330 };
331 }
332 if self.flush_millis > 0 {
333 loop {
334 match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
335 Ok(msg) => {
336 process!(msg);
337 while let Ok(msg) = rx.try_recv() {
338 process!(msg);
339 }
340 }
341 Err(RecvTimeoutError::Timeout) => {
342 self.flush(false);
343 }
344 Err(RecvTimeoutError::Disconnected) => {
345 self.flush(true);
346 return;
347 }
348 }
349 }
350 } else {
351 loop {
352 match rx.recv() {
353 Ok(msg) => {
354 process!(msg);
355 while let Ok(msg) = rx.try_recv() {
356 process!(msg);
357 }
358 self.flush(false);
359 }
360 Err(_) => {
361 self.flush(true);
362 return;
363 }
364 }
365 }
366 }
367 }
368}