captains_log/
buf_file_impl.rs1use crate::{
2 config::{LogFormat, SinkConfigBuild, SinkConfigTrait},
3 log_impl::{LogSink, LogSinkTrait},
4 rotation::*,
5 time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Once};
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{mpsc, MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19const FLUSH_SIZE_DEFAULT: usize = 4096;
22
23#[derive(Hash)]
62pub struct LogBufFile {
63 pub level: Level,
65
66 pub format: LogFormat,
67
68 pub file_path: Box<Path>,
70
71 pub flush_millis: usize,
77
78 pub rotation: Option<Rotation>,
80
81 pub flush_size: usize,
84}
85
86impl LogBufFile {
87 pub fn new<P1, P2>(
105 dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
106 ) -> Self
107 where
108 P1: Into<PathBuf>,
109 P2: Into<PathBuf>,
110 {
111 let dir_path: PathBuf = dir.into();
112 if !dir_path.exists() {
113 std::fs::create_dir(&dir_path).expect("create dir for log");
114 }
115 let file_path = dir_path.join(file_name.into()).into_boxed_path();
116 Self {
117 level,
118 format,
119 file_path,
120 flush_millis,
121 rotation: None,
122 flush_size: FLUSH_SIZE_DEFAULT,
123 }
124 }
125
126 pub fn rotation(mut self, ro: Rotation) -> Self {
127 self.rotation = Some(ro);
128 self
129 }
130}
131
132impl SinkConfigBuild for LogBufFile {
133 fn build(&self) -> LogSink {
134 LogSink::BufFile(LogSinkBufFile::new(self))
135 }
136}
137
138impl SinkConfigTrait for LogBufFile {
139 fn get_level(&self) -> Level {
140 self.level
141 }
142
143 fn get_file_path(&self) -> Option<Box<Path>> {
144 Some(self.file_path.clone())
145 }
146
147 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
148 self.hash(hasher);
149 hasher.write(b"LogBufFile");
150 }
151}
152
153pub(crate) struct LogSinkBufFile {
154 max_level: Level,
155 formatter: LogFormat,
157 _th: thread::JoinHandle<()>,
158 tx: MTx<mpsc::Array<Msg>>,
159}
160
161impl LogSinkBufFile {
162 fn new(config: &LogBufFile) -> Self {
163 let (tx, rx) = mpsc::bounded_blocking(1024);
164
165 let mut flush_millis = config.flush_millis;
166 if flush_millis == 0 || flush_millis > 1000 {
167 flush_millis = 1000;
168 }
169 let mut rotate_impl: Option<LogRotate> = None;
170 if let Some(r) = &config.rotation {
171 rotate_impl = Some(r.build(&config.file_path));
172 }
173 let mut flush_size = config.flush_size;
174 if flush_size == 0 {
175 flush_size = FLUSH_SIZE_DEFAULT;
176 }
177 let mut inner = BufFileInner {
178 size: 0,
179 create_time: None,
180 path: config.file_path.to_path_buf(),
181 f: None,
182 flush_millis,
183 flush_size,
184 buf: Vec::with_capacity(4096),
185 rotate: rotate_impl,
186 };
187 let _th = thread::spawn(move || inner.log_writer(rx));
188 Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
189 }
190}
191
192impl LogSinkTrait for LogSinkBufFile {
193 #[inline]
194 fn open(&self) -> std::io::Result<()> {
195 self.reopen()
196 }
197 fn reopen(&self) -> std::io::Result<()> {
198 let _ = self.tx.send(Msg::Reopen);
199 Ok(())
200 }
201
202 #[inline(always)]
203 fn log(&self, now: &Timer, r: &Record) {
204 if r.level() <= self.max_level {
205 let buf = self.formatter.process(now, r);
208 let _ = self.tx.send(Msg::Line(buf));
209 }
210 }
211
212 #[inline(always)]
213 fn flush(&self) {
214 let o = Arc::new(Once::new());
215 if self.tx.send(Msg::Flush(o.clone())).is_ok() {
216 o.wait();
217 }
218 }
219}
220
221enum Msg {
222 Line(String),
223 Reopen,
224 Flush(Arc<Once>),
225}
226
227struct BufFileInner {
228 size: u64,
229 create_time: Option<SystemTime>,
230 path: PathBuf,
231 f: Option<std::fs::File>,
232 buf: Vec<u8>,
233 flush_millis: usize,
234 rotate: Option<LogRotate>,
235 flush_size: usize,
236}
237
238impl FileSinkTrait for BufFileInner {
239 #[inline(always)]
240 fn get_create_time(&self) -> SystemTime {
241 self.create_time.unwrap()
242 }
243
244 #[inline(always)]
245 fn get_size(&self) -> u64 {
246 self.size
247 }
248}
249
250impl BufFileInner {
251 fn reopen(&mut self) {
252 match open_file(&self.path) {
253 Ok(f) => {
254 let mt = metadata(&self.path).expect("get metadata");
255 self.size = mt.len();
256 if self.create_time.is_none() {
257 self.create_time = Some(mt.modified().unwrap());
259 }
260 self.f.replace(f);
261 }
262 Err(e) => {
263 eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
264 }
265 }
266 }
267
268 fn write(&mut self, mut s: Vec<u8>) {
269 if self.buf.len() + s.len() > self.flush_size && !self.buf.is_empty() {
270 self.flush(false);
271 }
272 self.buf.reserve(s.len());
273 self.buf.append(&mut s);
274 if self.buf.len() >= self.flush_size {
275 self.flush(false);
276 }
277 }
278
279 #[inline(always)]
280 fn check_rotate(&mut self) {
281 if let Some(ro) = self.rotate.as_ref() {
282 if ro.rotate(self) {
283 self.reopen();
284 }
285 }
286 }
287
288 fn flush(&mut self, wait_rotate: bool) {
289 if let Some(f) = self.f.as_ref() {
290 self.size += self.buf.len() as u64;
291 let mut p = self.buf.as_ptr();
293 let mut l = self.buf.len();
294 loop {
295 let r = unsafe {
296 libc::write(f.as_raw_fd() as libc::c_int, p as *const libc::c_void, l)
297 };
298 if r == l as isize || r < 0 {
299 break;
301 }
302 l -= r as usize;
305 p = unsafe { p.add(r as usize) };
306 }
307 unsafe { self.buf.set_len(0) };
308 self.check_rotate();
309 }
310 if wait_rotate {
311 if let Some(ro) = self.rotate.as_ref() {
312 ro.wait();
313 }
314 }
315 }
316
317 fn log_writer(&mut self, rx: Rx<mpsc::Array<Msg>>) {
318 self.reopen();
319 self.check_rotate();
320
321 macro_rules! process {
322 ($msg: expr) => {
323 match $msg {
324 Msg::Line(line) => {
325 self.write(line.into());
326 }
327 Msg::Reopen => {
328 self.reopen();
329 }
330 Msg::Flush(o) => {
331 self.flush(true);
332 o.call_once(|| {});
333 }
334 }
335 };
336 }
337 if self.flush_millis > 0 {
338 loop {
339 match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
340 Ok(msg) => {
341 process!(msg);
342 while let Ok(msg) = rx.try_recv() {
343 process!(msg);
344 }
345 }
346 Err(RecvTimeoutError::Timeout) => {
347 self.flush(false);
348 }
349 Err(RecvTimeoutError::Disconnected) => {
350 self.flush(true);
351 return;
352 }
353 }
354 }
355 } else {
356 loop {
357 match rx.recv() {
358 Ok(msg) => {
359 process!(msg);
360 while let Ok(msg) = rx.try_recv() {
361 process!(msg);
362 }
363 self.flush(false);
364 }
365 Err(_) => {
366 self.flush(true);
367 return;
368 }
369 }
370 }
371 }
372 }
373}