sentinel_core/core/log/metric/
searcher.rs

1use super::*;
2use crate::{logging, Error, Result};
3use std::fs::File;
4use std::io::{Read, Seek, SeekFrom};
5use std::sync::Mutex;
6#[derive(Debug)]
7pub struct FilePosition {
8    pub metric_filename: PathBuf,
9    pub idx_filename: PathBuf,
10    pub cur_offset_in_idx: SeekFrom,
11    pub cur_sec_in_idx: u64,
12    // todo: cache the idx file handle here?
13}
14
15impl Default for FilePosition {
16    fn default() -> Self {
17        FilePosition {
18            metric_filename: PathBuf::default(),
19            idx_filename: PathBuf::default(),
20            cur_offset_in_idx: SeekFrom::Start(0),
21            cur_sec_in_idx: 0,
22        }
23    }
24}
25
26pub struct DefaultMetricSearcher {
27    pub reader: DefaultMetricLogReader,
28    pub base_dir: PathBuf,
29    pub base_filename: PathBuf,
30    pub cached_pos: Mutex<FilePosition>,
31}
32
33impl DefaultMetricSearcher {
34    pub fn new(base_dir: String, base_filename: String) -> Result<Self> {
35        if base_dir.len() == 0 {
36            return Err(Error::msg("empty base directory"));
37        }
38        if base_filename.len() == 0 {
39            return Err(Error::msg("empty base filename pattern"));
40        }
41        let reader = DefaultMetricLogReader::new();
42        Ok(DefaultMetricSearcher {
43            base_dir: PathBuf::from(base_dir),
44            base_filename: PathBuf::from(base_filename),
45            reader,
46            cached_pos: Mutex::new(FilePosition::default()),
47        })
48    }
49
50    pub fn find_by_time_and_resource(
51        &self,
52        begin_time_ms: u64,
53        end_time_ms: u64,
54        resource: String,
55    ) -> Result<MetricItemVec> {
56        self.search_offset_and_read(begin_time_ms, &move |filenames: Vec<PathBuf>,
57                                                          file_no: usize,
58                                                          offset: SeekFrom|
59              -> Result<MetricItemVec> {
60            self.reader.read_metrics_by_end_time(
61                filenames,
62                file_no,
63                offset,
64                begin_time_ms,
65                end_time_ms,
66                resource.clone(),
67            )
68        })
69    }
70
71    pub fn find_from_time_with_max_lines(
72        &self,
73        begin_time_ms: u64,
74        max_lines: usize,
75    ) -> Result<MetricItemVec> {
76        self.search_offset_and_read(begin_time_ms, &|filenames: Vec<PathBuf>,
77                                                     file_no: usize,
78                                                     offset: SeekFrom|
79         -> Result<MetricItemVec> {
80            self.reader
81                .read_metrics(filenames, file_no, offset, max_lines)
82        })
83    }
84
85    pub fn search_offset_and_read(
86        &self,
87        begin_time_ms: u64,
88        do_read: &dyn Fn(Vec<PathBuf>, usize, SeekFrom) -> Result<MetricItemVec>,
89    ) -> Result<MetricItemVec> {
90        let filenames = list_metric_files(&self.base_dir, &self.base_filename)?;
91        // Try to position the latest file index and offset from the cache (fast-path).
92        // If cache is not up-to-date, we'll read from the initial position (offset 0 of the first file).
93        let (offset_start, file_no) =
94            self.get_offset_start_and_file_idx(&filenames, begin_time_ms)?;
95        for i in file_no..filenames.len() {
96            let filename = &filenames[i];
97            // Retrieve the start offset that is valid for given condition.
98            // If offset = -1, it indicates that current file (i) does not satisfy the condition.
99            let offset =
100                self.find_offset_to_start(filename.to_str().unwrap(), begin_time_ms, offset_start);
101            match offset {
102                Ok(offset) => {
103                    // Read metric items from the offset of current file (number i).
104                    return do_read(filenames, i, SeekFrom::Start(offset as u64));
105                }
106                Err(err) => {
107                    logging::warn!("[search_offset_and_read] Failed to find_offset_to_start, will try next file, begin_time_ms: {}, filename: {:?}, offset_start: {:?}, err: {:?}", begin_time_ms, filename, offset_start, err);
108                }
109            }
110        }
111        return Ok(Vec::new());
112    }
113
114    fn get_offset_start_and_file_idx(
115        &self,
116        filenames: &Vec<PathBuf>,
117        begin_time_ms: u64,
118    ) -> Result<(SeekFrom, usize)> {
119        let cache_ok = self.is_position_in_time_for(begin_time_ms)?;
120        let mut i = 0;
121        let mut offset_in_idx = SeekFrom::Start(0);
122        let cached_pos = self.cached_pos.lock().unwrap();
123        if cache_ok {
124            for (j, v) in filenames.iter().enumerate() {
125                if v != &cached_pos.metric_filename {
126                    i = j;
127                    offset_in_idx = cached_pos.cur_offset_in_idx;
128                    break;
129                }
130            }
131        }
132        Ok((offset_in_idx, i))
133    }
134
135    fn find_offset_to_start(
136        &self,
137        filename: &str,
138        begin_time_ms: u64,
139        last_pos: SeekFrom,
140    ) -> Result<u32> {
141        let mut cached_pos = self.cached_pos.lock().unwrap();
142        cached_pos.idx_filename = "".into();
143        cached_pos.metric_filename = "".into();
144
145        let idx_filename = form_metric_idx_filename(filename);
146        let begin_sec = begin_time_ms / 1000;
147        let mut file = File::open(&idx_filename)?;
148
149        // Set position to the offset recorded in the idx file
150        cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(last_pos)?);
151        let mut sec: u64;
152        loop {
153            let mut buffer: [u8; 8] = [0; 8];
154            file.read(&mut buffer)?;
155            sec = u64::from_be_bytes(buffer);
156            if sec >= begin_sec {
157                break;
158            }
159            let mut buffer: [u8; 4] = [0; 4];
160            file.read(&mut buffer)?;
161            cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(SeekFrom::Current(0))?);
162        }
163        let mut buffer: [u8; 4] = [0; 4];
164        file.read(&mut buffer)?;
165        let offset = u32::from_be_bytes(buffer);
166        // Cache the idx filename and position
167        cached_pos.metric_filename = filename.into();
168        cached_pos.idx_filename = idx_filename.into();
169        cached_pos.cur_sec_in_idx = sec;
170        Ok(offset)
171    }
172
173    fn is_position_in_time_for(&self, begin_time_ms: u64) -> Result<bool> {
174        let cached_pos = self.cached_pos.lock().unwrap();
175        if begin_time_ms / 1000 < cached_pos.cur_sec_in_idx {
176            return Ok(false);
177        }
178        let idx_filename = &cached_pos.idx_filename;
179        if idx_filename == &PathBuf::from("") {
180            return Ok(false);
181        }
182        let mut idx_file = open_file_and_seek_to(&idx_filename, cached_pos.cur_offset_in_idx)?;
183
184        let mut buffer: [u8; 8] = [0; 8];
185        idx_file.read(&mut buffer)?;
186        let sec = u64::from_be_bytes(buffer);
187
188        Ok(sec == cached_pos.cur_sec_in_idx)
189    }
190}