captains_log/
buf_file_impl.rs1use crate::{
2 config::{LogFormat, SinkConfigBuild, SinkConfigTrait},
3 log_impl::{LogSink, LogSinkTrait},
4 rotation::*,
5 time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Once};
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19const FLUSH_SIZE_DEFAULT: usize = 4096;
22
23#[derive(Hash)]
62pub struct LogBufFile {
63 pub level: Level,
65
66 pub format: LogFormat,
67
68 pub file_path: Box<Path>,
70
71 pub flush_millis: usize,
77
78 pub rotation: Option<Rotation>,
80
81 pub flush_size: usize,
84}
85
86impl LogBufFile {
87 pub fn new<P1, P2>(
105 dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
106 ) -> Self
107 where
108 P1: Into<PathBuf>,
109 P2: Into<PathBuf>,
110 {
111 let dir_path: PathBuf = dir.into();
112 if !dir_path.exists() {
113 std::fs::create_dir(&dir_path).expect("create dir for log");
114 }
115 let file_path = dir_path.join(file_name.into()).into_boxed_path();
116 Self {
117 level,
118 format,
119 file_path,
120 flush_millis,
121 rotation: None,
122 flush_size: FLUSH_SIZE_DEFAULT,
123 }
124 }
125
126 pub fn rotation(mut self, ro: Rotation) -> Self {
127 self.rotation = Some(ro);
128 self
129 }
130}
131
132impl SinkConfigBuild for LogBufFile {
133 fn build(&self) -> LogSink {
134 LogSink::BufFile(LogSinkBufFile::new(self))
135 }
136}
137
138impl SinkConfigTrait for LogBufFile {
139 fn get_level(&self) -> Level {
140 self.level
141 }
142
143 fn get_file_path(&self) -> Option<Box<Path>> {
144 Some(self.file_path.clone())
145 }
146
147 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
148 self.hash(hasher);
149 hasher.write(b"LogBufFile");
150 }
151}
152
153pub(crate) struct LogSinkBufFile {
154 max_level: Level,
155 formatter: LogFormat,
157 _th: thread::JoinHandle<()>,
158 tx: MTx<Msg>,
159}
160
161impl LogSinkBufFile {
162 fn new(config: &LogBufFile) -> Self {
163 let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
164
165 let mut flush_millis = config.flush_millis;
166 if flush_millis == 0 || flush_millis > 1000 {
167 flush_millis = 1000;
168 }
169 let mut rotate_impl: Option<LogRotate> = None;
170 if let Some(r) = &config.rotation {
171 rotate_impl = Some(r.build(&config.file_path));
172 }
173 let mut flush_size = config.flush_size;
174 if flush_size == 0 {
175 flush_size = FLUSH_SIZE_DEFAULT;
176 }
177 let mut inner = BufFileInner {
178 size: 0,
179 create_time: None,
180 path: config.file_path.to_path_buf(),
181 f: None,
182 flush_millis,
183 flush_size,
184 buf: Vec::with_capacity(4096),
185 rotate: rotate_impl,
186 };
187 let _th = thread::spawn(move || inner.log_writer(rx));
188 Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
189 }
190}
191
192impl LogSinkTrait for LogSinkBufFile {
193 #[inline]
194 fn open(&self) -> std::io::Result<()> {
195 self.reopen()
196 }
197 fn reopen(&self) -> std::io::Result<()> {
198 let _ = self.tx.send(Msg::Reopen);
199 Ok(())
200 }
201
202 #[inline(always)]
203 fn log(&self, now: &Timer, r: &Record) {
204 if r.level() <= self.max_level {
205 let buf = self.formatter.process(now, r);
208 let _ = self.tx.send(Msg::Line(buf));
209 }
210 }
211
212 #[inline(always)]
213 fn flush(&self) {
214 let o = Arc::new(Once::new());
215 if self.tx.send(Msg::Flush(o.clone())).is_ok() {
216 o.wait();
217 }
218 }
219}
220
221enum Msg {
222 Line(String),
223 Reopen,
224 Flush(Arc<Once>),
225}
226
227struct BufFileInner {
228 size: u64,
229 create_time: Option<SystemTime>,
230 path: PathBuf,
231 f: Option<std::fs::File>,
232 buf: Vec<u8>,
233 flush_millis: usize,
234 rotate: Option<LogRotate>,
235 flush_size: usize,
236}
237
238impl FileSinkTrait for BufFileInner {
239 #[inline(always)]
240 fn get_create_time(&self) -> SystemTime {
241 self.create_time.unwrap()
242 }
243
244 #[inline(always)]
245 fn get_size(&self) -> u64 {
246 self.size
247 }
248}
249
250impl BufFileInner {
251 fn reopen(&mut self) {
252 match open_file(&self.path) {
253 Ok(f) => {
254 let mt = metadata(&self.path).expect("get metadata");
255 self.size = mt.len();
256 if self.create_time.is_none() {
257 self.create_time = Some(mt.modified().unwrap());
259 }
260 self.f.replace(f);
261 }
262 Err(e) => {
263 eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
264 }
265 }
266 }
267
268 fn write(&mut self, mut s: Vec<u8>) {
269 if self.buf.len() + s.len() > self.flush_size {
270 if self.buf.len() > 0 {
271 self.flush(false);
272 }
273 }
274 self.buf.reserve(s.len());
275 self.buf.append(&mut s);
276 if self.buf.len() >= self.flush_size {
277 self.flush(false);
278 }
279 }
280
281 #[inline(always)]
282 fn check_rotate(&mut self) {
283 if let Some(ro) = self.rotate.as_ref() {
284 if ro.rotate(self) {
285 self.reopen();
286 }
287 }
288 }
289
290 fn flush(&mut self, wait_rotate: bool) {
291 if let Some(f) = self.f.as_ref() {
292 self.size += self.buf.len() as u64;
293 let mut p = self.buf.as_ptr() as *const u8;
295 let mut l = self.buf.len();
296 loop {
297 let r = unsafe {
298 libc::write(f.as_raw_fd() as libc::c_int, p as *const libc::c_void, l)
299 };
300 if r == l as isize || r < 0 {
301 break;
303 }
304 l -= r as usize;
307 p = unsafe { p.add(r as usize) };
308 }
309 unsafe { self.buf.set_len(0) };
310 self.check_rotate();
311 }
312 if wait_rotate {
313 if let Some(ro) = self.rotate.as_ref() {
314 ro.wait();
315 }
316 }
317 }
318
319 fn log_writer(&mut self, rx: Rx<Msg>) {
320 self.reopen();
321 self.check_rotate();
322
323 macro_rules! process {
324 ($msg: expr) => {
325 match $msg {
326 Msg::Line(line) => {
327 self.write(line.into());
328 }
329 Msg::Reopen => {
330 self.reopen();
331 }
332 Msg::Flush(o) => {
333 self.flush(true);
334 o.call_once(|| {});
335 }
336 }
337 };
338 }
339 if self.flush_millis > 0 {
340 loop {
341 match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
342 Ok(msg) => {
343 process!(msg);
344 while let Ok(msg) = rx.try_recv() {
345 process!(msg);
346 }
347 }
348 Err(RecvTimeoutError::Timeout) => {
349 self.flush(false);
350 }
351 Err(RecvTimeoutError::Disconnected) => {
352 self.flush(true);
353 return;
354 }
355 }
356 }
357 } else {
358 loop {
359 match rx.recv() {
360 Ok(msg) => {
361 process!(msg);
362 while let Ok(msg) = rx.try_recv() {
363 process!(msg);
364 }
365 self.flush(false);
366 }
367 Err(_) => {
368 self.flush(true);
369 return;
370 }
371 }
372 }
373 }
374 }
375}