venom_log/plugin/
file_split.rs1use std::cell::RefCell;
11use std::fs::{ DirEntry, File, OpenOptions};
12use std::io::{ Seek, SeekFrom, Write};
13
14use chrono::{Local, NaiveDateTime};
15
16use crate::appender::{Command, VenomLogRecord, LogAppender};
17use crate::consts::LogSize;
18use std::ops::Sub;
19use std::time::Duration;
20use crate::error::LogError;
21use crate::{chan, Receiver, Sender};
22
23pub trait Packer: Send {
25 fn pack_name(&self) -> &'static str;
26 fn do_pack(&self, log_file: File, log_file_path: &str) -> Result<bool, LogError>;
28 fn retry(&self) -> i32 { return 0; }
30}
31
32pub struct FileSplitAppender {
34 cell: RefCell<FileSplitAppenderData>,
35}
36
37pub struct LogPack {
39 pub dir: String,
40 pub rolling: RollingType,
41 pub new_log_name: String,
42}
43
44#[derive(Copy, Clone, Debug)]
46pub enum RollingType {
47 All,
49 KeepTime(Duration),
54 KeepNum(i64),
56}
57
58impl RollingType {
59 fn read_paths(&self, dir: &str, temp_name: &str) -> Vec<DirEntry> {
60 let paths = std::fs::read_dir(dir);
61 if let Ok(paths) = paths {
62 let mut paths_vec = vec![];
63 for path in paths {
64 match path {
65 Ok(path) => {
66 if let Some(v) = path.file_name().to_str() {
67 if (v.ends_with(".log") && v.trim_end_matches(".log").ends_with(temp_name)) || !v.starts_with(temp_name) {
69 continue;
70 }
71 }
72 paths_vec.push(path);
73 }
74 _ => {}
75 }
76 }
77 paths_vec.sort_by(|a, b| b.file_name().cmp(&a.file_name()));
78 return paths_vec;
79 }
80 return vec![];
81 }
82
83 pub fn do_rolling(&self, temp_name: &str, dir: &str) {
84 match self {
85 RollingType::KeepNum(n) => {
86 let paths_vec = self.read_paths(dir, temp_name);
87 for index in 0..paths_vec.len() {
88 if index >= *n as usize {
89 let item = &paths_vec[index];
90 std::fs::remove_file(item.path()).unwrap();
91 }
92 }
93 }
94 RollingType::KeepTime(t) => {
95 let paths_vec = self.read_paths(dir, temp_name);
96 let duration = chrono::Duration::from_std(t.clone());
97 if duration.is_err() {
98 return;
99 }
100 let duration = duration.unwrap();
101 let now = Local::now().naive_local();
102 for index in 0..paths_vec.len() {
103 let item = &paths_vec[index];
104 let file_name = item.file_name();
105 let name = file_name.to_str().unwrap_or("").to_string();
106 if let Some(time) = self.file_name_parse_time(&name, temp_name) {
107 if now.sub(time) > duration {
108 std::fs::remove_file(item.path()).unwrap();
109 }
110 }
111 }
112 }
113 _ => {}
114 }
115 }
116
117 fn file_name_parse_time(&self, name: &str, temp_name: &str) -> Option<NaiveDateTime> {
118 if name.starts_with(temp_name) {
119 let mut time_str = name.replace(temp_name, "");
120 if let Some(v) = time_str.find(".") {
121 time_str = time_str[0..v].to_string();
122 }
123 let time = chrono::NaiveDateTime::parse_from_str(&time_str, "%Y_%m_%dT%H_%M_%S");
124 if let Ok(time) = time {
125 return Some(time);
126 }
127 }
128 return None;
129 }
130}
131
132pub struct FileSplitAppenderData {
135 max_split_bytes: usize,
136 dir_path: String,
137 file: File,
138 sender: Sender<LogPack>,
139 rolling_type: RollingType,
140 temp_bytes: usize,
142 temp_name: String,
143}
144
145impl FileSplitAppenderData {
146 pub fn send_pack(&mut self) {
147 let first_file_path = format!("{}{}.log", self.dir_path, &self.temp_name);
148 let new_log_name = format!(
149 "{}{}{}.log",
150 self.dir_path,
151 &self.temp_name,
152 format!("{:29}", Local::now().format("%Y_%m_%dT%H_%M_%S%.f")).replace(" ", "_")
153 );
154 std::fs::copy(&first_file_path, &new_log_name).unwrap();
155 self.sender.send(LogPack {
156 dir: self.dir_path.clone(),
157 rolling: self.rolling_type.clone(),
158 new_log_name: new_log_name,
159 }).unwrap();
160 self.truncate();
161 }
162
163 pub fn truncate(&mut self) {
164 self.file.set_len(0).unwrap();
166 self.file.seek(SeekFrom::Start(0)).unwrap();
167 self.temp_bytes = 0;
168 }
169}
170
171impl FileSplitAppender {
172 pub fn new(
177 file_path: &str,
178 max_temp_size: LogSize,
179 rolling_type: RollingType,
180 packer: Box<dyn Packer>,
181 ) -> FileSplitAppender {
182 let mut dir_path = file_path.to_owned();
183 let mut temp_file_name = dir_path.to_string();
184 if dir_path.contains("/"){
185 let new_dir_path = dir_path[0..dir_path.rfind("/").unwrap_or_default()].to_string()+"/";
186 std::fs::create_dir_all(&new_dir_path).unwrap();
187 temp_file_name = dir_path.trim_start_matches(&new_dir_path).to_string();
188 dir_path = new_dir_path;
189 }
190 if temp_file_name.is_empty(){
191 temp_file_name = "temp.log".to_string();
192 }
193 if !dir_path.is_empty() && dir_path.ends_with(".log") {
194 panic!("FileCompactionAppender only support new from path,for example: 'logs/xx/'");
195 }
196 if !dir_path.is_empty() && !dir_path.ends_with("/") {
197 panic!("FileCompactionAppender only support new from path,for example: 'logs/xx/'");
198 }
199 if !dir_path.is_empty() {
200 std::fs::create_dir_all(&dir_path).unwrap();
201 }
202 let file_name = temp_file_name.trim_end_matches(".log");
203 let first_file_path = format!("{}{}.log", &dir_path, file_name);
204 let file = OpenOptions::new()
205 .create(true)
206 .read(true)
207 .write(true)
208 .open(first_file_path.as_str());
209 if file.is_err() {
210 panic!(
211 "[venom_log] open and create file fail:{}",
212 file.err().unwrap()
213 );
214 }
215 let mut file = file.unwrap();
216 let mut temp_bytes = 0;
217 if let Ok(m) = file.metadata() {
218 temp_bytes = m.len() as usize;
219 }
220 file.seek(SeekFrom::Start(temp_bytes as u64)).unwrap();
221 let (sender, receiver) = chan();
222 spawn_saver(file_name,receiver, packer);
223 Self {
224 cell: RefCell::new(FileSplitAppenderData {
225 max_split_bytes: max_temp_size.get_len(),
226 temp_bytes: temp_bytes,
227 dir_path: dir_path.to_string(),
228 file: file,
229 sender: sender,
230 rolling_type: rolling_type,
231 temp_name: file_name.to_string()
232 }),
233 }
234 }
235}
236
237impl LogAppender for FileSplitAppender {
238 fn do_log(&self, record: &VenomLogRecord) {
239 let mut data = self.cell.borrow_mut();
240 if record.command.eq(&Command::CommandFlush) || (data.temp_bytes >= data.max_split_bytes) {
241 data.send_pack();
242 return;
243 }
244 let mut write_bytes = 0;
245 let w = data.file.write(record.formated.as_bytes());
246 if let Ok(w) = w {
247 write_bytes = write_bytes + w;
248 }
249 data.file.flush().unwrap();
250 data.temp_bytes += write_bytes;
251 }
252}
253
254fn spawn_saver(temp_name: &str, r: Receiver<LogPack>, packer: Box<dyn Packer>) {
256 let temp = temp_name.to_string();
257 std::thread::spawn(move || {
258 loop {
259 if let Ok(pack) = r.recv() {
260 pack.rolling.do_rolling(&temp, &pack.dir);
262 let log_file_path = pack.new_log_name.clone();
263 let remove = do_pack(&packer, pack);
265 if let Ok(remove) = remove {
266 if remove {
267 std::fs::remove_file(log_file_path).unwrap();
268 }
269 }
270 }
271 }
272 });
273}
274
275pub fn do_pack(packer: &Box<dyn Packer>, mut pack: LogPack) -> Result<bool, LogPack> {
277 let log_file_path = pack.new_log_name.as_str();
278 if log_file_path.is_empty() {
279 return Err(pack);
280 }
281 let log_file = OpenOptions::new().read(true).open(log_file_path);
282 if log_file.is_err() {
283 return Err(pack);
284 }
285 let log_file = log_file.unwrap();
286 let r = packer.do_pack(log_file, log_file_path);
288 if r.is_err() && packer.retry() > 0 {
289 let mut retry = 1;
290 while let Err(packs) = do_pack(packer, pack) {
291 pack = packs;
292 retry += 1;
293 if retry > packer.retry() {
294 break;
295 }
296 }
297 }
298 if let Ok(b) = r {
299 return Ok(b);
300 }
301 return Ok(false);
302}