dm_database_parser_sqllog/parser/
parse_functions.rs1use crate::error::ParseError;
6use crate::parser::constants::*;
7use crate::sqllog::{IndicatorsParts, MetaParts, Sqllog};
8use crate::tools::is_record_start_line;
9
10pub fn parse_record(lines: &[&str]) -> Result<Sqllog, ParseError> {
45 if lines.is_empty() {
46 return Err(ParseError::EmptyInput);
47 }
48
49 let first_line = lines[0];
50
51 if !is_record_start_line(first_line) {
53 return Err(ParseError::InvalidRecordStartLine);
54 }
55
56 if first_line.len() < MIN_RECORD_LENGTH {
58 return Err(ParseError::LineTooShort(first_line.len()));
59 }
60
61 let ts = &first_line[0..TIMESTAMP_LENGTH];
63
64 let closing_paren = first_line
66 .find(')')
67 .ok_or(ParseError::MissingClosingParen)?;
68
69 if closing_paren <= META_START_INDEX {
70 return Err(ParseError::InsufficientMetaFields(0));
71 }
72
73 let meta_str = &first_line[META_START_INDEX..closing_paren];
75 let meta = parse_meta(meta_str)?;
76
77 let body_start = closing_paren + BODY_OFFSET;
79 let full_body = build_body(first_line, body_start, &lines[1..]);
80
81 let indicators = parse_indicators(&full_body).ok();
83
84 let body = if indicators.is_some() {
86 extract_sql_body(&full_body)
87 } else {
88 full_body
89 };
90
91 Ok(Sqllog {
92 ts: ts.to_string(),
93 meta,
94 body,
95 indicators,
96 })
97}
98
99#[inline]
114pub(crate) fn build_body(
115 first_line: &str,
116 body_start: usize,
117 continuation_lines: &[&str],
118) -> String {
119 if continuation_lines.is_empty() {
120 if body_start < first_line.len() {
122 first_line[body_start..].to_string()
123 } else {
124 String::new()
125 }
126 } else {
127 let has_first_part = body_start < first_line.len();
129 let first_part_len = if has_first_part {
130 first_line.len() - body_start
131 } else {
132 0
133 };
134
135 let newline_count = if has_first_part {
136 continuation_lines.len()
137 } else {
138 continuation_lines.len() - 1
139 };
140
141 let total_len = first_part_len
142 + continuation_lines.iter().map(|s| s.len()).sum::<usize>()
143 + newline_count;
144
145 let mut result = String::with_capacity(total_len);
146
147 if has_first_part {
148 result.push_str(&first_line[body_start..]);
149 for line in continuation_lines {
150 result.push('\n');
151 result.push_str(line);
152 }
153 } else {
154 result.push_str(continuation_lines[0]);
156 for line in &continuation_lines[1..] {
157 result.push('\n');
158 result.push_str(line);
159 }
160 }
161
162 result
163 }
164}
165
166#[inline]
168pub(crate) fn extract_sql_body(full_body: &str) -> String {
169 INDICATOR_PATTERNS
171 .iter()
172 .filter_map(|pattern| full_body.find(pattern))
173 .min()
174 .map(|pos| full_body[..pos].trim_end().to_string())
175 .unwrap_or_else(|| full_body.to_string())
176}
177
178pub(crate) fn parse_meta(meta_str: &str) -> Result<MetaParts, ParseError> {
180 let fields: Vec<&str> = meta_str.split(' ').collect();
181
182 if fields.len() < 7 {
183 return Err(ParseError::InsufficientMetaFields(fields.len()));
184 }
185
186 let ep = parse_ep_field(fields[0])?;
188
189 let sess_id = extract_field_value(fields[1], SESS_PREFIX)?;
191 let thrd_id = extract_field_value(fields[2], THRD_PREFIX)?;
192 let username = extract_field_value(fields[3], USER_PREFIX)?;
193 let trxid = extract_field_value(fields[4], TRXID_PREFIX)?;
194 let statement = extract_field_value(fields[5], STMT_PREFIX)?;
195 let appname = extract_field_value(fields[6], APPNAME_PREFIX)?;
196
197 let client_ip = fields
199 .get(7)
200 .map(|field| extract_field_value(field, IP_PREFIX))
201 .transpose()?
202 .unwrap_or_default();
203
204 Ok(MetaParts {
205 ep,
206 sess_id,
207 thrd_id,
208 username,
209 trxid,
210 statement,
211 appname,
212 client_ip,
213 })
214}
215
216#[inline]
218pub(crate) fn parse_ep_field(ep_str: &str) -> Result<u8, ParseError> {
219 if !ep_str.starts_with("EP[") || !ep_str.ends_with(']') {
220 return Err(ParseError::InvalidEpFormat(ep_str.to_string()));
221 }
222
223 let ep_num = &ep_str[3..ep_str.len() - 1];
224 ep_num
225 .parse::<u8>()
226 .map_err(|_| ParseError::EpParseError(ep_num.to_string()))
227}
228
229#[inline]
231pub(crate) fn extract_field_value(field: &str, prefix: &str) -> Result<String, ParseError> {
232 field
233 .strip_prefix(prefix)
234 .map(|s| s.to_string())
235 .ok_or_else(|| ParseError::InvalidFieldFormat {
236 expected: prefix.to_string(),
237 actual: field.to_string(),
238 })
239}
240
241pub(crate) fn parse_indicators(body: &str) -> Result<IndicatorsParts, ParseError> {
243 let exec_time_str = extract_indicator(body, EXECTIME_PREFIX, EXECTIME_SUFFIX)?;
245 let row_count_str = extract_indicator(body, ROWCOUNT_PREFIX, ROWCOUNT_SUFFIX)?;
246 let exec_id_str = extract_indicator(body, EXEC_ID_PREFIX, EXEC_ID_SUFFIX)?;
247
248 let execute_time = exec_time_str.parse::<f32>().map_err(|_| {
249 ParseError::IndicatorsParseError(format!("执行时间解析失败: {}", exec_time_str))
250 })?;
251
252 let row_count = row_count_str.parse::<u32>().map_err(|_| {
253 ParseError::IndicatorsParseError(format!("行数解析失败: {}", row_count_str))
254 })?;
255
256 let execute_id = exec_id_str.parse::<i64>().map_err(|_| {
257 ParseError::IndicatorsParseError(format!("执行 ID 解析失败: {}", exec_id_str))
258 })?;
259
260 Ok(IndicatorsParts {
261 execute_time,
262 row_count,
263 execute_id,
264 })
265}
266
267#[inline]
269pub(crate) fn extract_indicator<'a>(
270 text: &'a str,
271 prefix: &str,
272 suffix: &str,
273) -> Result<&'a str, ParseError> {
274 let start_pos = text
275 .find(prefix)
276 .ok_or_else(|| ParseError::IndicatorsParseError(format!("未找到 {}", prefix)))?
277 + prefix.len();
278
279 let remaining = &text[start_pos..];
280 let end_offset = remaining
281 .find(suffix)
282 .ok_or_else(|| ParseError::IndicatorsParseError(format!("未找到 {}", suffix)))?;
283
284 Ok(remaining[..end_offset].trim())
285}