venom_log/plugin/
file_split.rs

1/*
2 * @Author: BuddyCoder
3 * @Date: 2022-03-30 10:24:52
4 * @LastEditTime: 2022-03-30 14:55:54
5 * @LastEditors: BuddyCoder
6 * @Description: 
7 * @FilePath: /venom_log/src/plugin/file_split.rs
8 * MIT
9 */
10use std::cell::RefCell;
11use std::fs::{ DirEntry, File, OpenOptions};
12use std::io::{ Seek, SeekFrom, Write};
13
14use chrono::{Local, NaiveDateTime};
15
16use crate::appender::{Command, VenomLogRecord, LogAppender};
17use crate::consts::LogSize;
18use std::ops::Sub;
19use std::time::Duration;
20use crate::error::LogError;
21use crate::{chan, Receiver, Sender};
22
23/// .zip or .lz4 or any one packer
24pub trait Packer: Send {
25    fn pack_name(&self) -> &'static str;
26    //return bool: remove_log_file
27    fn do_pack(&self, log_file: File, log_file_path: &str) -> Result<bool, LogError>;
28    /// default 0 is not retry pack. if retry > 0 ,it will trying rePack
29    fn retry(&self) -> i32 { return 0; }
30}
31
32/// split log file allow compress log 拆分日志文件允许压缩日志
33pub struct FileSplitAppender {
34    cell: RefCell<FileSplitAppenderData>,
35}
36
37///log data pack 日志数据包
38pub struct LogPack {
39    pub dir: String,
40    pub rolling: RollingType,
41    pub new_log_name: String,
42}
43
44///rolling keep type 滚动保存的类型
45#[derive(Copy, Clone, Debug)]
46pub enum RollingType {
47    /// keep All of log packs
48    All,
49    /// keep by Time Duration,
50    /// for example:
51    /// // keep one day log pack
52    /// (Duration::from_secs(24 * 3600))
53    KeepTime(Duration),
54    /// keep log pack num(.log,.zip.lz4...more)
55    KeepNum(i64),
56}
57
58impl RollingType {
59    fn read_paths(&self, dir: &str, temp_name: &str) -> Vec<DirEntry> {
60        let paths = std::fs::read_dir(dir);
61        if let Ok(paths) = paths {
62            let mut paths_vec = vec![];
63            for path in paths {
64                match path {
65                    Ok(path) => {
66                        if let Some(v) = path.file_name().to_str() {
67                            //filter temp.log and not start with temp
68                            if (v.ends_with(".log") && v.trim_end_matches(".log").ends_with(temp_name)) || !v.starts_with(temp_name) {
69                                continue;
70                            }
71                        }
72                        paths_vec.push(path);
73                    }
74                    _ => {}
75                }
76            }
77            paths_vec.sort_by(|a, b| b.file_name().cmp(&a.file_name()));
78            return paths_vec;
79        }
80        return vec![];
81    }
82
83    pub fn do_rolling(&self, temp_name: &str, dir: &str) {
84        match self {
85            RollingType::KeepNum(n) => {
86                let paths_vec = self.read_paths(dir, temp_name);
87                for index in 0..paths_vec.len() {
88                    if index >= *n as usize {
89                        let item = &paths_vec[index];
90                        std::fs::remove_file(item.path()).unwrap();
91                    }
92                }
93            }
94            RollingType::KeepTime(t) => {
95                let paths_vec = self.read_paths(dir, temp_name);
96                let duration = chrono::Duration::from_std(t.clone());
97                if duration.is_err() {
98                    return;
99                }
100                let duration = duration.unwrap();
101                let now = Local::now().naive_local();
102                for index in 0..paths_vec.len() {
103                    let item = &paths_vec[index];
104                    let file_name = item.file_name();
105                    let name = file_name.to_str().unwrap_or("").to_string();
106                    if let Some(time) = self.file_name_parse_time(&name, temp_name) {
107                        if now.sub(time) > duration {
108                            std::fs::remove_file(item.path()).unwrap();
109                        }
110                    }
111                }
112            }
113            _ => {}
114        }
115    }
116
117    fn file_name_parse_time(&self, name: &str, temp_name: &str) -> Option<NaiveDateTime> {
118        if name.starts_with(temp_name) {
119            let mut time_str = name.replace(temp_name, "");
120            if let Some(v) = time_str.find(".") {
121                time_str = time_str[0..v].to_string();
122            }
123            let time = chrono::NaiveDateTime::parse_from_str(&time_str, "%Y_%m_%dT%H_%M_%S");
124            if let Ok(time) = time {
125                return Some(time);
126            }
127        }
128        return None;
129    }
130}
131
132/// split log file allow pack compress log
133/// Memory space swop running time , reduces the number of repeated queries for IO
134pub struct FileSplitAppenderData {
135    max_split_bytes: usize,
136    dir_path: String,
137    file: File,
138    sender: Sender<LogPack>,
139    rolling_type: RollingType,
140    //cache data
141    temp_bytes: usize,
142    temp_name: String,
143}
144
145impl FileSplitAppenderData {
146    pub fn send_pack(&mut self) {
147        let first_file_path = format!("{}{}.log", self.dir_path, &self.temp_name);
148        let new_log_name = format!(
149            "{}{}{}.log",
150            self.dir_path,
151            &self.temp_name,
152            format!("{:29}", Local::now().format("%Y_%m_%dT%H_%M_%S%.f")).replace(" ", "_")
153        );
154        std::fs::copy(&first_file_path, &new_log_name).unwrap();
155        self.sender.send(LogPack {
156            dir: self.dir_path.clone(),
157            rolling: self.rolling_type.clone(),
158            new_log_name: new_log_name,
159        }).unwrap();
160        self.truncate();
161    }
162
163    pub fn truncate(&mut self) {
164        //reset data
165        self.file.set_len(0).unwrap();
166        self.file.seek(SeekFrom::Start(0)).unwrap();
167        self.temp_bytes = 0;
168    }
169}
170
171impl FileSplitAppender {
172    ///split_log_bytes:  log file data bytes(MB) splite
173    ///file_path:         the log dir or file name
174    ///log_pack_cap:     pack(zip,lz4 or more...) or log Waiting cap
175    /// packer: default is zip packer
176    pub fn new(
177        file_path: &str,
178        max_temp_size: LogSize,
179        rolling_type: RollingType,
180        packer: Box<dyn Packer>,
181    ) -> FileSplitAppender {
182        let mut dir_path = file_path.to_owned();
183        let mut temp_file_name = dir_path.to_string();
184        if dir_path.contains("/"){
185            let new_dir_path = dir_path[0..dir_path.rfind("/").unwrap_or_default()].to_string()+"/";
186            std::fs::create_dir_all(&new_dir_path).unwrap();
187            temp_file_name = dir_path.trim_start_matches(&new_dir_path).to_string();
188            dir_path = new_dir_path;
189        }
190        if temp_file_name.is_empty(){
191            temp_file_name = "temp.log".to_string();
192        }
193        if !dir_path.is_empty() && dir_path.ends_with(".log") {
194            panic!("FileCompactionAppender only support new from path,for example: 'logs/xx/'");
195        }
196        if !dir_path.is_empty() && !dir_path.ends_with("/") {
197            panic!("FileCompactionAppender only support new from path,for example: 'logs/xx/'");
198        }
199        if !dir_path.is_empty() {
200            std::fs::create_dir_all(&dir_path).unwrap();
201        }
202        let file_name = temp_file_name.trim_end_matches(".log");
203        let first_file_path = format!("{}{}.log", &dir_path, file_name);
204        let file = OpenOptions::new()
205            .create(true)
206            .read(true)
207            .write(true)
208            .open(first_file_path.as_str());
209        if file.is_err() {
210            panic!(
211                "[venom_log] open and create file fail:{}",
212                file.err().unwrap()
213            );
214        }
215        let mut file = file.unwrap();
216        let mut temp_bytes = 0;
217        if let Ok(m) = file.metadata() {
218            temp_bytes = m.len() as usize;
219        }
220        file.seek(SeekFrom::Start(temp_bytes as u64)).unwrap();
221        let (sender, receiver) = chan();
222        spawn_saver(file_name,receiver, packer);
223        Self {
224            cell: RefCell::new(FileSplitAppenderData {
225                max_split_bytes: max_temp_size.get_len(),
226                temp_bytes: temp_bytes,
227                dir_path: dir_path.to_string(),
228                file: file,
229                sender: sender,
230                rolling_type: rolling_type,
231                temp_name: file_name.to_string()
232            }),
233        }
234    }
235}
236
237impl LogAppender for FileSplitAppender {
238    fn do_log(&self, record: &VenomLogRecord) {
239        let mut data = self.cell.borrow_mut();
240        if record.command.eq(&Command::CommandFlush) || (data.temp_bytes >= data.max_split_bytes) {
241            data.send_pack();
242            return;
243        }
244        let mut write_bytes = 0;
245        let w = data.file.write(record.formated.as_bytes());
246        if let Ok(w) = w {
247            write_bytes = write_bytes + w;
248        }
249        data.file.flush().unwrap();
250        data.temp_bytes += write_bytes;
251    }
252}
253
254///spawn an saver thread to save log file or zip file
255fn spawn_saver(temp_name: &str, r: Receiver<LogPack>, packer: Box<dyn Packer>) {
256    let temp = temp_name.to_string();
257    std::thread::spawn(move || {
258        loop {
259            if let Ok(pack) = r.recv() {
260                //do rolling
261                pack.rolling.do_rolling(&temp, &pack.dir);
262                let log_file_path = pack.new_log_name.clone();
263                //do save pack
264                let remove = do_pack(&packer, pack);
265                if let Ok(remove) = remove {
266                    if remove {
267                        std::fs::remove_file(log_file_path).unwrap();
268                    }
269                }
270            }
271        }
272    });
273}
274
275/// write an Pack to zip file
276pub fn do_pack(packer: &Box<dyn Packer>, mut pack: LogPack) -> Result<bool, LogPack> {
277    let log_file_path = pack.new_log_name.as_str();
278    if log_file_path.is_empty() {
279        return Err(pack);
280    }
281    let log_file = OpenOptions::new().read(true).open(log_file_path);
282    if log_file.is_err() {
283        return Err(pack);
284    }
285    let log_file = log_file.unwrap();
286    //make
287    let r = packer.do_pack(log_file, log_file_path);
288    if r.is_err() && packer.retry() > 0 {
289        let mut retry = 1;
290        while let Err(packs) = do_pack(packer, pack) {
291            pack = packs;
292            retry += 1;
293            if retry > packer.retry() {
294                break;
295            }
296        }
297    }
298    if let Ok(b) = r {
299        return Ok(b);
300    }
301    return Ok(false);
302}