use super::*;
use crate::{config, logging, utils, Error, Result};
use std::fs::{DirBuilder, File};
use std::io::{Seek, SeekFrom};
use std::{io::Write, sync::RwLock};
#[derive(Default)]
pub struct DefaultMetricLogWriter {
base_dir: PathBuf,
base_filename: PathBuf,
max_single_size: u64,
max_file_amount: usize,
latest_op_sec: u64,
cur_metric_file: Option<RwLock<File>>,
cur_metric_idx_file: Option<RwLock<File>>,
}
impl MetricLogWriter for DefaultMetricLogWriter {
fn write(&mut self, ts: u64, items: &mut Vec<MetricItem>) -> Result<()> {
if items.len() == 0 {
return Ok(());
}
if ts == 0 {
return Err(Error::msg(format!("Invalid timestamp: {}", ts)));
}
if self.cur_metric_file.is_none() || self.cur_metric_idx_file.is_none() {
return Err(Error::msg(format!("file handle not initialized")));
}
for item in items.iter_mut() {
item.timestamp = ts
}
let time_sec = ts / 1000;
if time_sec < self.latest_op_sec {
return Ok(());
}
if time_sec > self.latest_op_sec {
let pos = self
.cur_metric_file
.as_ref()
.unwrap()
.write()
.unwrap()
.seek(SeekFrom::Current(0))?;
self.write_index(time_sec, pos)?;
if self.is_new_day(self.latest_op_sec, time_sec) {
self.roll_to_next_file(ts)?;
}
}
self.write_items_and_flush(items)?;
self.roll_file_size_exceeded(ts)?;
if time_sec > self.latest_op_sec {
self.latest_op_sec = time_sec;
}
return Ok(());
}
}
impl DefaultMetricLogWriter {
fn write_items_and_flush(&self, items: &Vec<MetricItem>) -> Result<()> {
let mut metric_out = self.cur_metric_file.as_ref().unwrap().write().unwrap();
for item in items {
let s = item.to_string() + "\n";
metric_out.write(s.as_ref())?;
}
metric_out.flush()?;
Ok(())
}
fn roll_file_size_exceeded(&mut self, time: u64) -> Result<()> {
if self.cur_metric_file.is_none() {
return Ok(());
}
let file_len = self
.cur_metric_file
.as_ref()
.unwrap()
.read()
.unwrap()
.metadata()?
.len();
if file_len >= self.max_single_size {
return self.roll_to_next_file(time);
}
Ok(())
}
fn roll_to_next_file(&mut self, time: u64) -> Result<()> {
let new_filename = self.next_file_name_of_time(time)?;
self.close_cur_and_new_file(new_filename)
}
fn write_index(&self, time: u64, offset: u64) -> Result<()> {
let mut idx_out = self.cur_metric_idx_file.as_ref().unwrap().write().unwrap();
idx_out.write(&time.to_be_bytes())?;
idx_out.write(&offset.to_be_bytes())?;
idx_out.flush()?;
Ok(())
}
fn remove_deprecated_files(&self) -> Result<()> {
let files = list_metric_files(&self.base_dir, &self.base_filename)?;
if files.len() >= self.max_file_amount {
let amount_to_remove = files.len() - self.max_file_amount + 1;
for i in 0..amount_to_remove {
let filename = &files[i];
let idx_filename = form_metric_idx_filename(filename.to_str().unwrap());
match fs::remove_file(filename) {
Ok(_) => {
logging::info!("[MetricWriter] Metric log file removed in DefaultMetricLogWriter.remove_deprecated_files(), filename: {:?}", filename);
}
Err(err) => {
logging::error!("Failed to remove metric log file in DefaultMetricLogWriter::remove_deprecated_files(), filename: {:?}, error: {:?}", filename, err);
}
}
match fs::remove_file(idx_filename) {
Ok(_) => {
logging::info!("[MetricWriter] Metric index file removed in DefaultMetricLogWriter.remove_deprecated_files(), filename: {:?}", filename);
}
Err(err) => {
logging::error!("Failed to remove metric index log file in DefaultMetricLogWriter::remove_deprecated_files(), filename: {:?}, error: {:?}", filename, err);
}
}
}
}
Ok(())
}
fn next_file_name_of_time(&self, time: u64) -> Result<String> {
let date_str = utils::format_date(time);
let file_pattern = self.base_filename.to_str().unwrap().to_owned() + "." + &date_str;
let list = list_metric_files_conditional(
&self.base_dir,
&PathBuf::from(&file_pattern),
|filename: &str, p: &str| -> bool { filename.contains(p) },
)?;
if list.len() == 0 {
return Ok(self.base_dir.to_str().unwrap().to_owned() + &file_pattern);
}
let last = &list[list.len() - 1];
let mut n = 0;
let items = last.to_str().unwrap().split(".").collect::<Vec<&str>>();
if items.len() > 0 {
n = str::parse::<u32>(items[items.len() - 1]).unwrap_or(0);
}
return Ok(format!(
"{}{}.{}",
self.base_dir.to_str().unwrap().to_owned(),
file_pattern,
n + 1
));
}
fn close_cur_and_new_file(&mut self, filename: String) -> Result<()> {
self.remove_deprecated_files()?;
if self.cur_metric_file.is_some() {
self.cur_metric_file.take();
}
if self.cur_metric_idx_file.is_some() {
self.cur_metric_idx_file.take();
}
let mf = fs::File::create(&filename)?;
logging::info!(
"[MetricWriter] New metric log file created, filename {:?}",
filename
);
let idx_file = form_metric_idx_filename(&filename);
let mif = fs::File::create(&idx_file)?;
logging::info!(
"[MetricWriter] New metric log index file created, idx_file {:?}",
idx_file
);
self.cur_metric_file = Some(RwLock::new(mf));
self.cur_metric_idx_file = Some(RwLock::new(mif));
return Ok(());
}
fn initialize(&mut self) -> Result<()> {
DirBuilder::new().recursive(true).create(&self.base_dir)?;
if self.cur_metric_file.is_some() {
return Ok(());
}
let ts = utils::curr_time_millis();
self.roll_to_next_file(ts)?;
self.latest_op_sec = ts / 1000;
return Ok(());
}
fn is_new_day(&self, last_sec: u64, sec: u64) -> bool {
sec / 86400 > last_sec / 86400
}
fn new_of_app(
max_single_size: u64,
max_file_amount: usize,
app_name: String,
) -> Result<DefaultMetricLogWriter> {
if max_single_size == 0 || max_file_amount == 0 {
return Err(Error::msg("invalid max_size or max_file_amount"));
}
let base_dir = PathBuf::from(config::log_metrc_dir());
let base_filename = form_metric_filename(&app_name, config::log_metrc_pid()).into();
let mut writer = DefaultMetricLogWriter {
base_dir,
base_filename,
max_single_size,
max_file_amount,
latest_op_sec: 0,
..Default::default()
};
writer.initialize()?;
Ok(writer)
}
pub fn new(max_size: u64, max_file_amount: usize) -> Result<DefaultMetricLogWriter> {
Self::new_of_app(max_size, max_file_amount, config::app_name())
}
}