sentinel_core/core/log/metric/
writer.rs

1use super::*;
2use crate::{config, logging, utils, Error, Result};
3use std::fs::{DirBuilder, File};
4use std::io::{Seek, SeekFrom};
5use std::{io::Write, sync::RwLock};
6
7#[derive(Default)]
8pub struct DefaultMetricLogWriter {
9    base_dir: PathBuf,
10    base_filename: PathBuf,
11    max_single_size: u64,
12    max_file_amount: usize,
13    latest_op_sec: u64,
14    cur_metric_file: Option<RwLock<File>>,
15    cur_metric_idx_file: Option<RwLock<File>>,
16}
17
18impl MetricLogWriter for DefaultMetricLogWriter {
19    fn write(&mut self, ts: u64, items: &mut Vec<MetricItem>) -> Result<()> {
20        if items.len() == 0 {
21            return Ok(());
22        }
23        if ts == 0 {
24            return Err(Error::msg(format!("Invalid timestamp: {}", ts)));
25        }
26        if self.cur_metric_file.is_none() || self.cur_metric_idx_file.is_none() {
27            return Err(Error::msg(format!("file handle not initialized")));
28        }
29        // Update all metric items to the given timestamp.
30        for item in items.iter_mut() {
31            item.timestamp = ts
32        }
33
34        let time_sec = ts / 1000;
35        if time_sec < self.latest_op_sec {
36            // ignore
37            return Ok(());
38        }
39        if time_sec > self.latest_op_sec {
40            let pos = self
41                .cur_metric_file
42                .as_ref()
43                .unwrap()
44                .write()
45                .unwrap()
46                .seek(SeekFrom::Current(0))?;
47            self.write_index(time_sec, pos)?;
48            if self.is_new_day(self.latest_op_sec, time_sec) {
49                self.roll_to_next_file(ts)?;
50            }
51        }
52        // Write and flush
53        self.write_items_and_flush(items)?;
54        self.roll_file_size_exceeded(ts)?;
55        if time_sec > self.latest_op_sec {
56            // Update the latest time_sec.
57            self.latest_op_sec = time_sec;
58        }
59
60        return Ok(());
61    }
62}
63
64impl DefaultMetricLogWriter {
65    fn write_items_and_flush(&self, items: &Vec<MetricItem>) -> Result<()> {
66        let mut metric_out = self.cur_metric_file.as_ref().unwrap().write().unwrap();
67        for item in items {
68            // Append the LF line separator.
69            let s = item.to_string() + "\n";
70            metric_out.write(s.as_ref())?;
71        }
72        metric_out.flush()?;
73        Ok(())
74    }
75
76    /// Check whether the file size is exceeded `config::SINGLE_FILE_MAX_SIZE`.
77    /// If so, roll to the next file.
78    fn roll_file_size_exceeded(&mut self, time: u64) -> Result<()> {
79        if self.cur_metric_file.is_none() {
80            return Ok(());
81        }
82        let file_len = self
83            .cur_metric_file
84            .as_ref()
85            .unwrap()
86            .read()
87            .unwrap()
88            .metadata()?
89            .len();
90        if file_len >= self.max_single_size {
91            return self.roll_to_next_file(time);
92        }
93        Ok(())
94    }
95
96    /// Close last file and open a new one.
97    fn roll_to_next_file(&mut self, time: u64) -> Result<()> {
98        // pay attention, if the computation name of the next file is failed,
99        // the old file won't be closed and metric logs would be append to the old file.
100        // And it may also lead to failure when deleting deprecated metric logs, since it also depnds on this .
101        let new_filename = self.next_file_name_of_time(time)?;
102        self.close_cur_and_new_file(new_filename)
103    }
104
105    fn write_index(&self, time: u64, offset: u64) -> Result<()> {
106        // Use BigEndian here to keep consistent with DataOutputStream in Java.
107        let mut idx_out = self.cur_metric_idx_file.as_ref().unwrap().write().unwrap();
108        idx_out.write(&time.to_be_bytes())?;
109        idx_out.write(&offset.to_be_bytes())?;
110        idx_out.flush()?;
111        Ok(())
112    }
113
114    /// Remove the outdated metric log files and corresponding index files,
115    /// incase that log files accumulate exceedng the `config::MAX_FILE_AMOUNT`.
116    fn remove_deprecated_files(&self) -> Result<()> {
117        let files = list_metric_files(&self.base_dir, &self.base_filename)?;
118        if files.len() >= self.max_file_amount {
119            let amount_to_remove = files.len() - self.max_file_amount + 1;
120            for i in 0..amount_to_remove {
121                let filename = &files[i];
122                let idx_filename = form_metric_idx_filename(filename.to_str().unwrap());
123                match fs::remove_file(filename) {
124                    Ok(_) => {
125                        logging::info!("[MetricWriter] Metric log file removed in DefaultMetricLogWriter.remove_deprecated_files(), filename: {:?}", filename);
126                    }
127                    Err(err) => {
128                        logging::error!("Failed to remove metric log file in DefaultMetricLogWriter::remove_deprecated_files(), filename: {:?}, error: {:?}", filename, err);
129                    }
130                }
131                match fs::remove_file(idx_filename) {
132                    Ok(_) => {
133                        logging::info!("[MetricWriter] Metric index file removed in DefaultMetricLogWriter.remove_deprecated_files(), filename: {:?}", filename);
134                    }
135                    Err(err) => {
136                        logging::error!("Failed to remove metric index log file in DefaultMetricLogWriter::remove_deprecated_files(), filename: {:?}, error: {:?}", filename, err);
137                    }
138                }
139            }
140        }
141        Ok(())
142    }
143
144    /// Compute the next file name of the given time. Find the lastest file with the same prefix pattern and add increase the order.
145    /// And never use `fmt::Debug` to print the file name (either `String/&str` or `PathBuf/&Path`), since it will contain `\"`.
146    fn next_file_name_of_time(&self, time: u64) -> Result<String> {
147        let date_str = utils::format_date(time);
148        let file_pattern = self.base_filename.to_str().unwrap().to_owned() + "." + &date_str;
149        let list = list_metric_files_conditional(
150            &self.base_dir,
151            &PathBuf::from(&file_pattern),
152            |filename: &str, p: &str| -> bool { filename.contains(p) },
153        )?;
154        if list.len() == 0 {
155            return Ok(self.base_dir.to_str().unwrap().to_owned() + &file_pattern);
156        }
157        // Find files with the same prefix pattern, have to add the order to separate files.
158        let last = &list[list.len() - 1];
159        let mut n = 0;
160        let items = last.to_str().unwrap().split(".").collect::<Vec<&str>>();
161        if items.len() > 0 {
162            n = str::parse::<u32>(items[items.len() - 1]).unwrap_or(0);
163        }
164        return Ok(format!(
165            "{}{}.{}",
166            self.base_dir.to_str().unwrap().to_owned(),
167            file_pattern,
168            n + 1
169        ));
170    }
171
172    fn close_cur_and_new_file(&mut self, filename: String) -> Result<()> {
173        self.remove_deprecated_files()?;
174
175        if self.cur_metric_file.is_some() {
176            self.cur_metric_file.take();
177        }
178        if self.cur_metric_idx_file.is_some() {
179            self.cur_metric_idx_file.take();
180        }
181        // Create new metric log file, whether it exists or not.
182        let mf = fs::File::create(&filename)?;
183        logging::info!(
184            "[MetricWriter] New metric log file created, filename {:?}",
185            filename
186        );
187
188        let idx_file = form_metric_idx_filename(&filename);
189        let mif = fs::File::create(&idx_file)?;
190        logging::info!(
191            "[MetricWriter] New metric log index file created, idx_file {:?}",
192            idx_file
193        );
194
195        self.cur_metric_file = Some(RwLock::new(mf));
196        self.cur_metric_idx_file = Some(RwLock::new(mif));
197
198        return Ok(());
199    }
200
201    fn initialize(&mut self) -> Result<()> {
202        // Create the dir if not exists.
203        DirBuilder::new().recursive(true).create(&self.base_dir)?;
204        if self.cur_metric_file.is_some() {
205            return Ok(());
206        }
207        let ts = utils::curr_time_millis();
208        self.roll_to_next_file(ts)?;
209        self.latest_op_sec = ts / 1000;
210        return Ok(());
211    }
212
213    fn is_new_day(&self, last_sec: u64, sec: u64) -> bool {
214        sec / 86400 > last_sec / 86400
215    }
216
217    fn new_of_app(
218        max_single_size: u64,
219        max_file_amount: usize,
220        app_name: String,
221    ) -> Result<DefaultMetricLogWriter> {
222        if max_single_size == 0 || max_file_amount == 0 {
223            return Err(Error::msg("invalid max_size or max_file_amount"));
224        }
225        let base_dir = PathBuf::from(config::log_metrc_dir());
226        let base_filename = form_metric_filename(&app_name, config::log_metrc_pid()).into();
227
228        let mut writer = DefaultMetricLogWriter {
229            base_dir,
230            base_filename,
231            max_single_size,
232            max_file_amount,
233            latest_op_sec: 0,
234            ..Default::default()
235        };
236        writer.initialize()?;
237        Ok(writer)
238    }
239
240    pub fn new(max_size: u64, max_file_amount: usize) -> Result<DefaultMetricLogWriter> {
241        Self::new_of_app(max_size, max_file_amount, config::app_name())
242    }
243}