captains_log/
buf_file_impl.rs1use crate::{
2 config::{LogFormat, SinkConfigTrait},
3 log_impl::{LogSink, LogSinkTrait},
4 rotation::*,
5 time::Timer,
6};
7use log::{Level, Record};
8use std::fs::metadata;
9use std::hash::{Hash, Hasher};
10use std::os::unix::prelude::*;
11use std::path::{Path, PathBuf};
12use std::sync::Once;
13use std::time::{Duration, SystemTime};
14
15use crate::file_impl::open_file;
16use crossfire::{MTx, RecvTimeoutError, Rx};
17use std::thread;
18
19const FLUSH_SIZE: usize = 4096;
22
23#[derive(Hash)]
25pub struct LogBufFile {
26 pub level: Level,
28
29 pub format: LogFormat,
30
31 pub file_path: Box<Path>,
33
34 pub flush_millis: usize,
38
39 pub rotation: Option<Rotation>,
41}
42
43impl LogBufFile {
44 pub fn new<P1, P2>(
62 dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
63 ) -> Self
64 where
65 P1: Into<PathBuf>,
66 P2: Into<PathBuf>,
67 {
68 let dir_path: PathBuf = dir.into();
69 if !dir_path.exists() {
70 std::fs::create_dir(&dir_path).expect("create dir for log");
71 }
72 let file_path = dir_path.join(file_name.into()).into_boxed_path();
73 Self { level, format, file_path, flush_millis, rotation: None }
74 }
75
76 pub fn rotation(mut self, ro: Rotation) -> Self {
77 self.rotation = Some(ro);
78 self
79 }
80}
81
82impl SinkConfigTrait for LogBufFile {
83 fn get_level(&self) -> Level {
84 self.level
85 }
86
87 fn get_file_path(&self) -> Option<Box<Path>> {
88 Some(self.file_path.clone())
89 }
90
91 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
92 self.hash(hasher);
93 hasher.write(b"LogRawFile");
94 }
95
96 fn build(&self) -> LogSink {
97 LogSink::BufFile(LogSinkBufFile::new(self))
98 }
99}
100
101pub(crate) struct LogSinkBufFile {
102 max_level: Level,
103 formatter: LogFormat,
105 _th: thread::JoinHandle<()>,
106 tx: MTx<Msg>,
107}
108
109impl LogSinkBufFile {
110 pub fn new(config: &LogBufFile) -> Self {
111 let (tx, rx) = crossfire::mpsc::bounded_blocking(100);
112
113 let mut flush_millis = config.flush_millis;
114 if flush_millis == 0 || flush_millis > 1000 {
115 flush_millis = 1000;
116 }
117 let mut rotate_impl: Option<LogRotate> = None;
118 if let Some(r) = &config.rotation {
119 rotate_impl = Some(r.build(&config.file_path));
120 }
121 let mut inner = BufFileInner {
122 size: 0,
123 create_time: None,
124 path: config.file_path.to_path_buf(),
125 f: None,
126 flush_millis,
127 buf: Vec::with_capacity(4096),
128 rotate: rotate_impl,
129 };
130 let _th = thread::spawn(move || inner.log_writer(rx));
131 Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
132 }
133}
134
135impl LogSinkTrait for LogSinkBufFile {
136 fn reopen(&self) -> std::io::Result<()> {
137 let _ = self.tx.send(Msg::Reopen);
138 Ok(())
139 }
140
141 #[inline(always)]
142 fn log(&self, now: &Timer, r: &Record) {
143 if r.level() <= self.max_level {
144 let buf = self.formatter.process(now, r);
147 let _ = self.tx.send(Msg::Line(buf));
148 }
149 }
150
151 #[inline(always)]
152 fn flush(&self) {
153 let _ = self.tx.send(Msg::Flush(Once::new()));
154 }
155}
156
157enum Msg {
158 Line(String),
159 Reopen,
160 Flush(Once),
161}
162
163struct BufFileInner {
164 size: u64,
165 create_time: Option<SystemTime>,
166 path: PathBuf,
167 f: Option<std::fs::File>,
168 buf: Vec<u8>,
169 flush_millis: usize,
170 rotate: Option<LogRotate>,
171}
172
173impl FileSinkTrait for BufFileInner {
174 #[inline(always)]
175 fn get_create_time(&self) -> SystemTime {
176 self.create_time.unwrap()
177 }
178
179 #[inline(always)]
180 fn get_size(&self) -> u64 {
181 self.size
182 }
183}
184
185impl BufFileInner {
186 fn reopen(&mut self) {
187 match open_file(&self.path) {
188 Ok(f) => {
189 let mt = metadata(&self.path).expect("get metadata");
190 self.size = mt.len();
191 if self.create_time.is_none() {
192 self.create_time = Some(mt.modified().unwrap());
194 }
195 self.f.replace(f);
196 }
197 Err(e) => {
198 eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
199 }
200 }
201 }
202
203 fn write(&mut self, mut s: Vec<u8>) {
204 if self.buf.len() + s.len() > FLUSH_SIZE {
205 if self.buf.len() > 0 {
206 self.flush(false);
207 }
208 }
209 self.buf.reserve(s.len());
210 self.buf.append(&mut s);
211 if self.buf.len() >= FLUSH_SIZE {
212 self.flush(false);
213 }
214 }
215
216 #[inline(always)]
217 fn check_rotate(&mut self) {
218 if let Some(ro) = self.rotate.as_ref() {
219 if ro.rotate(self) {
220 self.reopen();
221 }
222 }
223 }
224
225 fn flush(&mut self, wait_rotate: bool) {
226 if let Some(f) = self.f.as_ref() {
227 self.size += self.buf.len() as u64;
228 let _ = unsafe {
230 libc::write(
231 f.as_raw_fd() as libc::c_int,
232 self.buf.as_ptr() as *const libc::c_void,
233 self.buf.len(),
234 )
235 };
236 unsafe { self.buf.set_len(0) };
237 self.check_rotate();
238 }
239 if wait_rotate {
240 if let Some(ro) = self.rotate.as_ref() {
241 ro.wait();
242 }
243 }
244 }
245
246 fn log_writer(&mut self, rx: Rx<Msg>) {
247 self.reopen();
248 self.check_rotate();
249
250 macro_rules! process {
251 ($msg: expr) => {
252 match $msg {
253 Msg::Line(line) => {
254 self.write(line.into());
255 }
256 Msg::Reopen => {
257 self.reopen();
258 }
259 Msg::Flush(o) => {
260 self.flush(true);
261 o.call_once(|| {});
262 }
263 }
264 };
265 }
266 if self.flush_millis > 0 {
267 loop {
268 match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
269 Ok(msg) => {
270 process!(msg);
271 while let Ok(msg) = rx.try_recv() {
272 process!(msg);
273 }
274 }
275 Err(RecvTimeoutError::Timeout) => {
276 self.flush(false);
277 }
278 Err(RecvTimeoutError::Disconnected) => {
279 self.flush(true);
280 return;
281 }
282 }
283 }
284 } else {
285 loop {
286 match rx.recv() {
287 Ok(msg) => {
288 process!(msg);
289 while let Ok(msg) = rx.try_recv() {
290 process!(msg);
291 }
292 self.flush(false);
293 }
294 Err(_) => {
295 self.flush(true);
296 return;
297 }
298 }
299 }
300 }
301 }
302}