sentinel_core/core/log/metric/
searcher.rs1use super::*;
2use crate::{logging, Error, Result};
3use std::fs::File;
4use std::io::{Read, Seek, SeekFrom};
5use std::sync::Mutex;
6#[derive(Debug)]
7pub struct FilePosition {
8 pub metric_filename: PathBuf,
9 pub idx_filename: PathBuf,
10 pub cur_offset_in_idx: SeekFrom,
11 pub cur_sec_in_idx: u64,
12 }
14
15impl Default for FilePosition {
16 fn default() -> Self {
17 FilePosition {
18 metric_filename: PathBuf::default(),
19 idx_filename: PathBuf::default(),
20 cur_offset_in_idx: SeekFrom::Start(0),
21 cur_sec_in_idx: 0,
22 }
23 }
24}
25
26pub struct DefaultMetricSearcher {
27 pub reader: DefaultMetricLogReader,
28 pub base_dir: PathBuf,
29 pub base_filename: PathBuf,
30 pub cached_pos: Mutex<FilePosition>,
31}
32
33impl DefaultMetricSearcher {
34 pub fn new(base_dir: String, base_filename: String) -> Result<Self> {
35 if base_dir.len() == 0 {
36 return Err(Error::msg("empty base directory"));
37 }
38 if base_filename.len() == 0 {
39 return Err(Error::msg("empty base filename pattern"));
40 }
41 let reader = DefaultMetricLogReader::new();
42 Ok(DefaultMetricSearcher {
43 base_dir: PathBuf::from(base_dir),
44 base_filename: PathBuf::from(base_filename),
45 reader,
46 cached_pos: Mutex::new(FilePosition::default()),
47 })
48 }
49
50 pub fn find_by_time_and_resource(
51 &self,
52 begin_time_ms: u64,
53 end_time_ms: u64,
54 resource: String,
55 ) -> Result<MetricItemVec> {
56 self.search_offset_and_read(begin_time_ms, &move |filenames: Vec<PathBuf>,
57 file_no: usize,
58 offset: SeekFrom|
59 -> Result<MetricItemVec> {
60 self.reader.read_metrics_by_end_time(
61 filenames,
62 file_no,
63 offset,
64 begin_time_ms,
65 end_time_ms,
66 resource.clone(),
67 )
68 })
69 }
70
71 pub fn find_from_time_with_max_lines(
72 &self,
73 begin_time_ms: u64,
74 max_lines: usize,
75 ) -> Result<MetricItemVec> {
76 self.search_offset_and_read(begin_time_ms, &|filenames: Vec<PathBuf>,
77 file_no: usize,
78 offset: SeekFrom|
79 -> Result<MetricItemVec> {
80 self.reader
81 .read_metrics(filenames, file_no, offset, max_lines)
82 })
83 }
84
85 pub fn search_offset_and_read(
86 &self,
87 begin_time_ms: u64,
88 do_read: &dyn Fn(Vec<PathBuf>, usize, SeekFrom) -> Result<MetricItemVec>,
89 ) -> Result<MetricItemVec> {
90 let filenames = list_metric_files(&self.base_dir, &self.base_filename)?;
91 let (offset_start, file_no) =
94 self.get_offset_start_and_file_idx(&filenames, begin_time_ms)?;
95 for i in file_no..filenames.len() {
96 let filename = &filenames[i];
97 let offset =
100 self.find_offset_to_start(filename.to_str().unwrap(), begin_time_ms, offset_start);
101 match offset {
102 Ok(offset) => {
103 return do_read(filenames, i, SeekFrom::Start(offset as u64));
105 }
106 Err(err) => {
107 logging::warn!("[search_offset_and_read] Failed to find_offset_to_start, will try next file, begin_time_ms: {}, filename: {:?}, offset_start: {:?}, err: {:?}", begin_time_ms, filename, offset_start, err);
108 }
109 }
110 }
111 return Ok(Vec::new());
112 }
113
114 fn get_offset_start_and_file_idx(
115 &self,
116 filenames: &Vec<PathBuf>,
117 begin_time_ms: u64,
118 ) -> Result<(SeekFrom, usize)> {
119 let cache_ok = self.is_position_in_time_for(begin_time_ms)?;
120 let mut i = 0;
121 let mut offset_in_idx = SeekFrom::Start(0);
122 let cached_pos = self.cached_pos.lock().unwrap();
123 if cache_ok {
124 for (j, v) in filenames.iter().enumerate() {
125 if v != &cached_pos.metric_filename {
126 i = j;
127 offset_in_idx = cached_pos.cur_offset_in_idx;
128 break;
129 }
130 }
131 }
132 Ok((offset_in_idx, i))
133 }
134
135 fn find_offset_to_start(
136 &self,
137 filename: &str,
138 begin_time_ms: u64,
139 last_pos: SeekFrom,
140 ) -> Result<u32> {
141 let mut cached_pos = self.cached_pos.lock().unwrap();
142 cached_pos.idx_filename = "".into();
143 cached_pos.metric_filename = "".into();
144
145 let idx_filename = form_metric_idx_filename(filename);
146 let begin_sec = begin_time_ms / 1000;
147 let mut file = File::open(&idx_filename)?;
148
149 cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(last_pos)?);
151 let mut sec: u64;
152 loop {
153 let mut buffer: [u8; 8] = [0; 8];
154 file.read(&mut buffer)?;
155 sec = u64::from_be_bytes(buffer);
156 if sec >= begin_sec {
157 break;
158 }
159 let mut buffer: [u8; 4] = [0; 4];
160 file.read(&mut buffer)?;
161 cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(SeekFrom::Current(0))?);
162 }
163 let mut buffer: [u8; 4] = [0; 4];
164 file.read(&mut buffer)?;
165 let offset = u32::from_be_bytes(buffer);
166 cached_pos.metric_filename = filename.into();
168 cached_pos.idx_filename = idx_filename.into();
169 cached_pos.cur_sec_in_idx = sec;
170 Ok(offset)
171 }
172
173 fn is_position_in_time_for(&self, begin_time_ms: u64) -> Result<bool> {
174 let cached_pos = self.cached_pos.lock().unwrap();
175 if begin_time_ms / 1000 < cached_pos.cur_sec_in_idx {
176 return Ok(false);
177 }
178 let idx_filename = &cached_pos.idx_filename;
179 if idx_filename == &PathBuf::from("") {
180 return Ok(false);
181 }
182 let mut idx_file = open_file_and_seek_to(&idx_filename, cached_pos.cur_offset_in_idx)?;
183
184 let mut buffer: [u8; 8] = [0; 8];
185 idx_file.read(&mut buffer)?;
186 let sec = u64::from_be_bytes(buffer);
187
188 Ok(sec == cached_pos.cur_sec_in_idx)
189 }
190}