captains_log/
buf_file_impl.rs1use crate::{
2 config::{LogFormat, SinkConfigTrait},
3 log_impl::{LogSink, LogSinkTrait},
4 rotation::*,
5 time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Once};
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19pub const FLUSH_SIZE_DEFAULT: usize = 4096;
22
23#[derive(Hash)]
62pub struct LogBufFile {
63 pub level: Level,
65
66 pub format: LogFormat,
67
68 pub file_path: Box<Path>,
70
71 pub flush_millis: usize,
77
78 pub rotation: Option<Rotation>,
80
81 pub flush_size: usize,
83}
84
85impl LogBufFile {
86 pub fn new<P1, P2>(
104 dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
105 ) -> Self
106 where
107 P1: Into<PathBuf>,
108 P2: Into<PathBuf>,
109 {
110 let dir_path: PathBuf = dir.into();
111 if !dir_path.exists() {
112 std::fs::create_dir(&dir_path).expect("create dir for log");
113 }
114 let file_path = dir_path.join(file_name.into()).into_boxed_path();
115 Self {
116 level,
117 format,
118 file_path,
119 flush_millis,
120 rotation: None,
121 flush_size: FLUSH_SIZE_DEFAULT,
122 }
123 }
124
125 pub fn rotation(mut self, ro: Rotation) -> Self {
126 self.rotation = Some(ro);
127 self
128 }
129}
130
131impl SinkConfigTrait for LogBufFile {
132 fn get_level(&self) -> Level {
133 self.level
134 }
135
136 fn get_file_path(&self) -> Option<Box<Path>> {
137 Some(self.file_path.clone())
138 }
139
140 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
141 self.hash(hasher);
142 hasher.write(b"LogBufFile");
143 }
144
145 fn build(&self) -> LogSink {
146 LogSink::BufFile(LogSinkBufFile::new(self))
147 }
148}
149
150pub(crate) struct LogSinkBufFile {
151 max_level: Level,
152 formatter: LogFormat,
154 _th: thread::JoinHandle<()>,
155 tx: MTx<Msg>,
156}
157
158impl LogSinkBufFile {
159 fn new(config: &LogBufFile) -> Self {
160 let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
161
162 let mut flush_millis = config.flush_millis;
163 if flush_millis == 0 || flush_millis > 1000 {
164 flush_millis = 1000;
165 }
166 let mut rotate_impl: Option<LogRotate> = None;
167 if let Some(r) = &config.rotation {
168 rotate_impl = Some(r.build(&config.file_path));
169 }
170 let mut flush_size = config.flush_size;
171 if flush_size == 0 {
172 flush_size = FLUSH_SIZE_DEFAULT;
173 }
174 let mut inner = BufFileInner {
175 size: 0,
176 create_time: None,
177 path: config.file_path.to_path_buf(),
178 f: None,
179 flush_millis,
180 flush_size,
181 buf: Vec::with_capacity(4096),
182 rotate: rotate_impl,
183 };
184 let _th = thread::spawn(move || inner.log_writer(rx));
185 Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
186 }
187}
188
189impl LogSinkTrait for LogSinkBufFile {
190 fn reopen(&self) -> std::io::Result<()> {
191 let _ = self.tx.send(Msg::Reopen);
192 Ok(())
193 }
194
195 #[inline(always)]
196 fn log(&self, now: &Timer, r: &Record) {
197 if r.level() <= self.max_level {
198 let buf = self.formatter.process(now, r);
201 let _ = self.tx.send(Msg::Line(buf));
202 }
203 }
204
205 #[inline(always)]
206 fn flush(&self) {
207 let o = Arc::new(Once::new());
208 if self.tx.send(Msg::Flush(o.clone())).is_ok() {
209 o.wait();
210 }
211 }
212}
213
214enum Msg {
215 Line(String),
216 Reopen,
217 Flush(Arc<Once>),
218}
219
220struct BufFileInner {
221 size: u64,
222 create_time: Option<SystemTime>,
223 path: PathBuf,
224 f: Option<std::fs::File>,
225 buf: Vec<u8>,
226 flush_millis: usize,
227 rotate: Option<LogRotate>,
228 flush_size: usize,
229}
230
231impl FileSinkTrait for BufFileInner {
232 #[inline(always)]
233 fn get_create_time(&self) -> SystemTime {
234 self.create_time.unwrap()
235 }
236
237 #[inline(always)]
238 fn get_size(&self) -> u64 {
239 self.size
240 }
241}
242
243impl BufFileInner {
244 fn reopen(&mut self) {
245 match open_file(&self.path) {
246 Ok(f) => {
247 let mt = metadata(&self.path).expect("get metadata");
248 self.size = mt.len();
249 if self.create_time.is_none() {
250 self.create_time = Some(mt.modified().unwrap());
252 }
253 self.f.replace(f);
254 }
255 Err(e) => {
256 eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
257 }
258 }
259 }
260
261 fn write(&mut self, mut s: Vec<u8>) {
262 if self.buf.len() + s.len() > self.flush_size {
263 if self.buf.len() > 0 {
264 self.flush(false);
265 }
266 }
267 self.buf.reserve(s.len());
268 self.buf.append(&mut s);
269 if self.buf.len() >= self.flush_size {
270 self.flush(false);
271 }
272 }
273
274 #[inline(always)]
275 fn check_rotate(&mut self) {
276 if let Some(ro) = self.rotate.as_ref() {
277 if ro.rotate(self) {
278 self.reopen();
279 }
280 }
281 }
282
283 fn flush(&mut self, wait_rotate: bool) {
284 if let Some(f) = self.f.as_ref() {
285 self.size += self.buf.len() as u64;
286 let _ = unsafe {
288 libc::write(
289 f.as_raw_fd() as libc::c_int,
290 self.buf.as_ptr() as *const libc::c_void,
291 self.buf.len(),
292 )
293 };
294 unsafe { self.buf.set_len(0) };
295 self.check_rotate();
296 }
297 if wait_rotate {
298 if let Some(ro) = self.rotate.as_ref() {
299 ro.wait();
300 }
301 }
302 }
303
304 fn log_writer(&mut self, rx: Rx<Msg>) {
305 self.reopen();
306 self.check_rotate();
307
308 macro_rules! process {
309 ($msg: expr) => {
310 match $msg {
311 Msg::Line(line) => {
312 self.write(line.into());
313 }
314 Msg::Reopen => {
315 self.reopen();
316 }
317 Msg::Flush(o) => {
318 self.flush(true);
319 o.call_once(|| {});
320 }
321 }
322 };
323 }
324 if self.flush_millis > 0 {
325 loop {
326 match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
327 Ok(msg) => {
328 process!(msg);
329 while let Ok(msg) = rx.try_recv() {
330 process!(msg);
331 }
332 }
333 Err(RecvTimeoutError::Timeout) => {
334 self.flush(false);
335 }
336 Err(RecvTimeoutError::Disconnected) => {
337 self.flush(true);
338 return;
339 }
340 }
341 }
342 } else {
343 loop {
344 match rx.recv() {
345 Ok(msg) => {
346 process!(msg);
347 while let Ok(msg) = rx.try_recv() {
348 process!(msg);
349 }
350 self.flush(false);
351 }
352 Err(_) => {
353 self.flush(true);
354 return;
355 }
356 }
357 }
358 }
359 }
360}