dm_database_parser_sqllog/parser/
parse_functions.rs1use crate::error::ParseError;
6use crate::parser::constants::*;
7use crate::sqllog::{IndicatorsParts, MetaParts, Sqllog};
8use crate::tools::is_record_start_line;
9
10pub fn parse_record(lines: &[&str]) -> Result<Sqllog, ParseError> {
45 if lines.is_empty() {
46 return Err(ParseError::EmptyInput);
47 }
48
49 let first_line = lines[0];
50
51 if !is_record_start_line(first_line) {
53 return Err(ParseError::InvalidRecordStartLine {
54 raw: first_line.to_string(),
55 });
56 }
57
58 if first_line.len() < MIN_RECORD_LENGTH {
60 return Err(ParseError::LineTooShort {
61 length: first_line.len(),
62 raw: first_line.to_string(),
63 });
64 }
65
66 let ts = &first_line[0..TIMESTAMP_LENGTH];
68
69 let closing_paren = first_line
71 .find(')')
72 .ok_or_else(|| ParseError::MissingClosingParen {
73 raw: first_line.to_string(),
74 })?;
75
76 if closing_paren <= META_START_INDEX {
77 return Err(ParseError::InsufficientMetaFields {
78 count: 0,
79 raw: first_line[META_START_INDEX..].to_string(),
80 });
81 }
82
83 let meta_str = &first_line[META_START_INDEX..closing_paren];
85 let meta = parse_meta(meta_str)?;
86
87 let body_start = closing_paren + BODY_OFFSET;
89 let full_body = build_body(first_line, body_start, &lines[1..]);
90
91 let indicators = parse_indicators(&full_body).ok();
93
94 let body = if indicators.is_some() {
96 extract_sql_body(&full_body)
97 } else {
98 full_body
99 };
100
101 Ok(Sqllog {
102 ts: ts.to_string(),
103 meta,
104 body,
105 indicators,
106 })
107}
108
109#[inline]
124pub(crate) fn build_body(
125 first_line: &str,
126 body_start: usize,
127 continuation_lines: &[&str],
128) -> String {
129 if continuation_lines.is_empty() {
130 if body_start < first_line.len() {
132 first_line[body_start..].to_string()
133 } else {
134 String::new()
135 }
136 } else {
137 let has_first_part = body_start < first_line.len();
139 let first_part_len = if has_first_part {
140 first_line.len() - body_start
141 } else {
142 0
143 };
144
145 let newline_count = if has_first_part {
146 continuation_lines.len()
147 } else {
148 continuation_lines.len() - 1
149 };
150
151 let total_len = first_part_len
152 + continuation_lines.iter().map(|s| s.len()).sum::<usize>()
153 + newline_count;
154
155 let mut result = String::with_capacity(total_len);
156
157 if has_first_part {
158 result.push_str(&first_line[body_start..]);
159 for line in continuation_lines {
160 result.push('\n');
161 result.push_str(line);
162 }
163 } else {
164 result.push_str(continuation_lines[0]);
166 for line in &continuation_lines[1..] {
167 result.push('\n');
168 result.push_str(line);
169 }
170 }
171
172 result
173 }
174}
175
176#[inline]
178pub(crate) fn extract_sql_body(full_body: &str) -> String {
179 INDICATOR_PATTERNS
181 .iter()
182 .filter_map(|pattern| full_body.find(pattern))
183 .min()
184 .map(|pos| full_body[..pos].trim_end().to_string())
185 .unwrap_or_else(|| full_body.to_string())
186}
187
188pub(crate) fn parse_meta(meta_str: &str) -> Result<MetaParts, ParseError> {
190 let ep_end = meta_str
194 .find(' ')
195 .ok_or(ParseError::InsufficientMetaFields {
196 count: 0,
197 raw: meta_str.to_string(),
198 })?;
199 let ep = parse_ep_field(&meta_str[..ep_end], meta_str)?;
200
201 let sess_start = ep_end + 1;
203 let sess_end = meta_str[sess_start..]
204 .find(' ')
205 .ok_or(ParseError::InsufficientMetaFields {
206 count: 1,
207 raw: meta_str.to_string(),
208 })?
209 + sess_start;
210 let sess_id = extract_field_value(&meta_str[sess_start..sess_end], SESS_PREFIX, meta_str)?;
211
212 let thrd_start = sess_end + 1;
214 let thrd_end = meta_str[thrd_start..]
215 .find(' ')
216 .ok_or(ParseError::InsufficientMetaFields {
217 count: 2,
218 raw: meta_str.to_string(),
219 })?
220 + thrd_start;
221 let thrd_id = extract_field_value(&meta_str[thrd_start..thrd_end], THRD_PREFIX, meta_str)?;
222
223 let user_start = thrd_end + 1;
225 let user_end = meta_str[user_start..]
226 .find(' ')
227 .ok_or(ParseError::InsufficientMetaFields {
228 count: 3,
229 raw: meta_str.to_string(),
230 })?
231 + user_start;
232 let username = extract_field_value(&meta_str[user_start..user_end], USER_PREFIX, meta_str)?;
233
234 let trxid_start = user_end + 1;
236 let trxid_end_result = meta_str[trxid_start..].find(' ');
237 let (trxid, after_trxid) = if let Some(trxid_end_offset) = trxid_end_result {
238 let trxid_end = trxid_start + trxid_end_offset;
239 (
240 extract_field_value(&meta_str[trxid_start..trxid_end], TRXID_PREFIX, meta_str)?,
241 trxid_end + 1,
242 )
243 } else {
244 (
246 extract_field_value(&meta_str[trxid_start..], TRXID_PREFIX, meta_str)?,
247 meta_str.len(),
248 )
249 };
250
251 if after_trxid >= meta_str.len() {
253 return Ok(MetaParts {
254 ep,
255 sess_id,
256 thrd_id,
257 username,
258 trxid,
259 statement: String::new(),
260 appname: String::new(),
261 client_ip: String::new(),
262 });
263 }
264
265 let stmt_start = after_trxid;
267 let stmt_end_result = meta_str[stmt_start..].find(' ');
268 let (statement, after_stmt) = if let Some(stmt_end_offset) = stmt_end_result {
269 let stmt_end = stmt_start + stmt_end_offset;
270 (
271 extract_field_value(&meta_str[stmt_start..stmt_end], STMT_PREFIX, meta_str)?,
272 stmt_end + 1,
273 )
274 } else {
275 (
277 extract_field_value(&meta_str[stmt_start..], STMT_PREFIX, meta_str)?,
278 meta_str.len(),
279 )
280 };
281
282 if after_stmt >= meta_str.len() {
284 return Ok(MetaParts {
285 ep,
286 sess_id,
287 thrd_id,
288 username,
289 trxid,
290 statement,
291 appname: String::new(),
292 client_ip: String::new(),
293 });
294 }
295
296 let appname_start = after_stmt;
298 let (appname, client_ip) = if appname_start < meta_str.len() {
299 if meta_str[appname_start..].starts_with(APPNAME_PREFIX) {
301 let appname_value_start = appname_start + APPNAME_PREFIX.len();
304 if let Some(ip_pos) = meta_str[appname_value_start..].find(" ip:::ffff:") {
305 let appname_value = &meta_str[appname_value_start..appname_value_start + ip_pos];
307 let ip_start = appname_value_start + ip_pos + 1;
308 let client_ip = extract_field_value(&meta_str[ip_start..], IP_PREFIX, meta_str)?;
309 (appname_value.to_string(), client_ip)
310 } else {
311 let appname_value = &meta_str[appname_value_start..];
313 (appname_value.to_string(), String::new())
314 }
315 } else {
316 (String::new(), String::new())
318 }
319 } else {
320 (String::new(), String::new())
321 };
322
323 Ok(MetaParts {
324 ep,
325 sess_id,
326 thrd_id,
327 username,
328 trxid,
329 statement,
330 appname,
331 client_ip,
332 })
333}
334
335#[inline]
337pub(crate) fn parse_ep_field(ep_str: &str, raw: &str) -> Result<u8, ParseError> {
338 if !ep_str.starts_with("EP[") || !ep_str.ends_with(']') {
339 return Err(ParseError::InvalidEpFormat {
340 value: ep_str.to_string(),
341 raw: raw.to_string(),
342 });
343 }
344
345 let ep_num = &ep_str[3..ep_str.len() - 1];
346 ep_num.parse::<u8>().map_err(|_| ParseError::EpParseError {
347 value: ep_num.to_string(),
348 raw: raw.to_string(),
349 })
350}
351
352#[inline]
354pub(crate) fn extract_field_value(
355 field: &str,
356 prefix: &str,
357 raw: &str,
358) -> Result<String, ParseError> {
359 field
360 .strip_prefix(prefix)
361 .map(|s| s.to_string())
362 .ok_or_else(|| ParseError::InvalidFieldFormat {
363 expected: prefix.to_string(),
364 actual: field.to_string(),
365 raw: raw.to_string(),
366 })
367}
368
369pub(crate) fn parse_indicators(body: &str) -> Result<IndicatorsParts, ParseError> {
371 let exec_time_str = extract_indicator(body, EXECTIME_PREFIX, EXECTIME_SUFFIX)?;
373 let row_count_str = extract_indicator(body, ROWCOUNT_PREFIX, ROWCOUNT_SUFFIX)?;
374 let exec_id_str = extract_indicator(body, EXEC_ID_PREFIX, EXEC_ID_SUFFIX)?;
375
376 let execute_time =
377 exec_time_str
378 .parse::<f32>()
379 .map_err(|_| ParseError::IndicatorsParseError {
380 reason: format!("执行时间解析失败: {}", exec_time_str),
381 raw: body.to_string(),
382 })?;
383
384 let row_count = row_count_str
385 .parse::<u32>()
386 .map_err(|_| ParseError::IndicatorsParseError {
387 reason: format!("行数解析失败: {}", row_count_str),
388 raw: body.to_string(),
389 })?;
390
391 let execute_id = exec_id_str
392 .parse::<i64>()
393 .map_err(|_| ParseError::IndicatorsParseError {
394 reason: format!("执行 ID 解析失败: {}", exec_id_str),
395 raw: body.to_string(),
396 })?;
397
398 Ok(IndicatorsParts {
399 execute_time,
400 row_count,
401 execute_id,
402 })
403}
404
405#[inline]
407pub(crate) fn extract_indicator<'a>(
408 text: &'a str,
409 prefix: &str,
410 suffix: &str,
411) -> Result<&'a str, ParseError> {
412 let start_pos = text
413 .find(prefix)
414 .ok_or_else(|| ParseError::IndicatorsParseError {
415 reason: format!("未找到 {}", prefix),
416 raw: text.to_string(),
417 })?
418 + prefix.len();
419
420 let remaining = &text[start_pos..];
421 let end_offset = remaining
422 .find(suffix)
423 .ok_or_else(|| ParseError::IndicatorsParseError {
424 reason: format!("未找到 {}", suffix),
425 raw: text.to_string(),
426 })?;
427
428 Ok(remaining[..end_offset].trim())
429}