sentinel_core/core/log/metric/
writer.rs1use super::*;
2use crate::{config, logging, utils, Error, Result};
3use std::fs::{DirBuilder, File};
4use std::io::{Seek, SeekFrom};
5use std::{io::Write, sync::RwLock};
6
7#[derive(Default)]
8pub struct DefaultMetricLogWriter {
9 base_dir: PathBuf,
10 base_filename: PathBuf,
11 max_single_size: u64,
12 max_file_amount: usize,
13 latest_op_sec: u64,
14 cur_metric_file: Option<RwLock<File>>,
15 cur_metric_idx_file: Option<RwLock<File>>,
16}
17
18impl MetricLogWriter for DefaultMetricLogWriter {
19 fn write(&mut self, ts: u64, items: &mut Vec<MetricItem>) -> Result<()> {
20 if items.len() == 0 {
21 return Ok(());
22 }
23 if ts == 0 {
24 return Err(Error::msg(format!("Invalid timestamp: {}", ts)));
25 }
26 if self.cur_metric_file.is_none() || self.cur_metric_idx_file.is_none() {
27 return Err(Error::msg(format!("file handle not initialized")));
28 }
29 for item in items.iter_mut() {
31 item.timestamp = ts
32 }
33
34 let time_sec = ts / 1000;
35 if time_sec < self.latest_op_sec {
36 return Ok(());
38 }
39 if time_sec > self.latest_op_sec {
40 let pos = self
41 .cur_metric_file
42 .as_ref()
43 .unwrap()
44 .write()
45 .unwrap()
46 .seek(SeekFrom::Current(0))?;
47 self.write_index(time_sec, pos)?;
48 if self.is_new_day(self.latest_op_sec, time_sec) {
49 self.roll_to_next_file(ts)?;
50 }
51 }
52 self.write_items_and_flush(items)?;
54 self.roll_file_size_exceeded(ts)?;
55 if time_sec > self.latest_op_sec {
56 self.latest_op_sec = time_sec;
58 }
59
60 return Ok(());
61 }
62}
63
64impl DefaultMetricLogWriter {
65 fn write_items_and_flush(&self, items: &Vec<MetricItem>) -> Result<()> {
66 let mut metric_out = self.cur_metric_file.as_ref().unwrap().write().unwrap();
67 for item in items {
68 let s = item.to_string() + "\n";
70 metric_out.write(s.as_ref())?;
71 }
72 metric_out.flush()?;
73 Ok(())
74 }
75
76 fn roll_file_size_exceeded(&mut self, time: u64) -> Result<()> {
79 if self.cur_metric_file.is_none() {
80 return Ok(());
81 }
82 let file_len = self
83 .cur_metric_file
84 .as_ref()
85 .unwrap()
86 .read()
87 .unwrap()
88 .metadata()?
89 .len();
90 if file_len >= self.max_single_size {
91 return self.roll_to_next_file(time);
92 }
93 Ok(())
94 }
95
96 fn roll_to_next_file(&mut self, time: u64) -> Result<()> {
98 let new_filename = self.next_file_name_of_time(time)?;
102 self.close_cur_and_new_file(new_filename)
103 }
104
105 fn write_index(&self, time: u64, offset: u64) -> Result<()> {
106 let mut idx_out = self.cur_metric_idx_file.as_ref().unwrap().write().unwrap();
108 idx_out.write(&time.to_be_bytes())?;
109 idx_out.write(&offset.to_be_bytes())?;
110 idx_out.flush()?;
111 Ok(())
112 }
113
114 fn remove_deprecated_files(&self) -> Result<()> {
117 let files = list_metric_files(&self.base_dir, &self.base_filename)?;
118 if files.len() >= self.max_file_amount {
119 let amount_to_remove = files.len() - self.max_file_amount + 1;
120 for i in 0..amount_to_remove {
121 let filename = &files[i];
122 let idx_filename = form_metric_idx_filename(filename.to_str().unwrap());
123 match fs::remove_file(filename) {
124 Ok(_) => {
125 logging::info!("[MetricWriter] Metric log file removed in DefaultMetricLogWriter.remove_deprecated_files(), filename: {:?}", filename);
126 }
127 Err(err) => {
128 logging::error!("Failed to remove metric log file in DefaultMetricLogWriter::remove_deprecated_files(), filename: {:?}, error: {:?}", filename, err);
129 }
130 }
131 match fs::remove_file(idx_filename) {
132 Ok(_) => {
133 logging::info!("[MetricWriter] Metric index file removed in DefaultMetricLogWriter.remove_deprecated_files(), filename: {:?}", filename);
134 }
135 Err(err) => {
136 logging::error!("Failed to remove metric index log file in DefaultMetricLogWriter::remove_deprecated_files(), filename: {:?}, error: {:?}", filename, err);
137 }
138 }
139 }
140 }
141 Ok(())
142 }
143
144 fn next_file_name_of_time(&self, time: u64) -> Result<String> {
147 let date_str = utils::format_date(time);
148 let file_pattern = self.base_filename.to_str().unwrap().to_owned() + "." + &date_str;
149 let list = list_metric_files_conditional(
150 &self.base_dir,
151 &PathBuf::from(&file_pattern),
152 |filename: &str, p: &str| -> bool { filename.contains(p) },
153 )?;
154 if list.len() == 0 {
155 return Ok(self.base_dir.to_str().unwrap().to_owned() + &file_pattern);
156 }
157 let last = &list[list.len() - 1];
159 let mut n = 0;
160 let items = last.to_str().unwrap().split(".").collect::<Vec<&str>>();
161 if items.len() > 0 {
162 n = str::parse::<u32>(items[items.len() - 1]).unwrap_or(0);
163 }
164 return Ok(format!(
165 "{}{}.{}",
166 self.base_dir.to_str().unwrap().to_owned(),
167 file_pattern,
168 n + 1
169 ));
170 }
171
172 fn close_cur_and_new_file(&mut self, filename: String) -> Result<()> {
173 self.remove_deprecated_files()?;
174
175 if self.cur_metric_file.is_some() {
176 self.cur_metric_file.take();
177 }
178 if self.cur_metric_idx_file.is_some() {
179 self.cur_metric_idx_file.take();
180 }
181 let mf = fs::File::create(&filename)?;
183 logging::info!(
184 "[MetricWriter] New metric log file created, filename {:?}",
185 filename
186 );
187
188 let idx_file = form_metric_idx_filename(&filename);
189 let mif = fs::File::create(&idx_file)?;
190 logging::info!(
191 "[MetricWriter] New metric log index file created, idx_file {:?}",
192 idx_file
193 );
194
195 self.cur_metric_file = Some(RwLock::new(mf));
196 self.cur_metric_idx_file = Some(RwLock::new(mif));
197
198 return Ok(());
199 }
200
201 fn initialize(&mut self) -> Result<()> {
202 DirBuilder::new().recursive(true).create(&self.base_dir)?;
204 if self.cur_metric_file.is_some() {
205 return Ok(());
206 }
207 let ts = utils::curr_time_millis();
208 self.roll_to_next_file(ts)?;
209 self.latest_op_sec = ts / 1000;
210 return Ok(());
211 }
212
213 fn is_new_day(&self, last_sec: u64, sec: u64) -> bool {
214 sec / 86400 > last_sec / 86400
215 }
216
217 fn new_of_app(
218 max_single_size: u64,
219 max_file_amount: usize,
220 app_name: String,
221 ) -> Result<DefaultMetricLogWriter> {
222 if max_single_size == 0 || max_file_amount == 0 {
223 return Err(Error::msg("invalid max_size or max_file_amount"));
224 }
225 let base_dir = PathBuf::from(config::log_metrc_dir());
226 let base_filename = form_metric_filename(&app_name, config::log_metrc_pid()).into();
227
228 let mut writer = DefaultMetricLogWriter {
229 base_dir,
230 base_filename,
231 max_single_size,
232 max_file_amount,
233 latest_op_sec: 0,
234 ..Default::default()
235 };
236 writer.initialize()?;
237 Ok(writer)
238 }
239
240 pub fn new(max_size: u64, max_file_amount: usize) -> Result<DefaultMetricLogWriter> {
241 Self::new_of_app(max_size, max_file_amount, config::app_name())
242 }
243}