dm_database_parser_sqllog/parser/
parse_functions.rs

1//! 核心解析函数
2//!
3//! 包含了所有用于解析 SQL 日志的核心函数,如解析记录、元数据、指标等。
4
5use crate::error::ParseError;
6use crate::parser::constants::*;
7use crate::sqllog::{IndicatorsParts, MetaParts, Sqllog};
8use crate::tools::is_record_start_line;
9
10/// 从行数组解析成 Sqllog 结构
11///
12/// 这是主要的解析函数,将一行或多行文本解析为结构化的 `Sqllog` 对象。
13///
14/// # 参数
15///
16/// * `lines` - 包含日志记录的行(第一行必须是有效的起始行,后续行是继续行)
17///
18/// # 返回
19///
20/// * `Ok(Sqllog)` - 解析成功
21/// * `Err(ParseError)` - 解析失败,包含详细的错误信息
22///
23/// # 错误
24///
25/// 可能返回以下错误:
26/// - `EmptyInput` - 输入为空
27/// - `InvalidRecordStartLine` - 第一行不是有效的记录起始行
28/// - `LineTooShort` - 行长度不足
29/// - `MissingClosingParen` - 缺少右括号
30/// - `InsufficientMetaFields` - Meta 字段数量不足
31///
32/// # 示例
33///
34/// ```
35/// use dm_database_parser_sqllog::parse_record;
36///
37/// let lines = vec!["2025-08-12 10:57:09.548 (EP[0] sess:123 thrd:456 user:alice trxid:789 stmt:999 appname:app) SELECT 1"];
38/// let sqllog = parse_record(&lines).unwrap();
39///
40/// assert_eq!(sqllog.ts, "2025-08-12 10:57:09.548");
41/// assert_eq!(sqllog.meta.username, "alice");
42/// assert_eq!(sqllog.body, "SELECT 1");
43/// ```
44pub fn parse_record(lines: &[&str]) -> Result<Sqllog, ParseError> {
45    if lines.is_empty() {
46        return Err(ParseError::EmptyInput);
47    }
48
49    let first_line = lines[0];
50
51    // 验证第一行格式
52    if !is_record_start_line(first_line) {
53        return Err(ParseError::InvalidRecordStartLine {
54            raw: first_line.to_string(),
55        });
56    }
57
58    // 验证行长度
59    if first_line.len() < MIN_RECORD_LENGTH {
60        return Err(ParseError::LineTooShort {
61            length: first_line.len(),
62            raw: first_line.to_string(),
63        });
64    }
65
66    // 解析时间戳
67    let ts = &first_line[0..TIMESTAMP_LENGTH];
68
69    // 查找 meta 部分的右括号
70    let closing_paren = first_line
71        .find(')')
72        .ok_or_else(|| ParseError::MissingClosingParen {
73            raw: first_line.to_string(),
74        })?;
75
76    if closing_paren <= META_START_INDEX {
77        return Err(ParseError::InsufficientMetaFields {
78            count: 0,
79            raw: first_line[META_START_INDEX..].to_string(),
80        });
81    }
82
83    // 解析 meta 部分
84    let meta_str = &first_line[META_START_INDEX..closing_paren];
85    let meta = parse_meta(meta_str)?;
86
87    // 构建 body(包含继续行)
88    let body_start = closing_paren + BODY_OFFSET;
89    let full_body = build_body(first_line, body_start, &lines[1..]);
90
91    // 尝试解析 indicators(可选)
92    let indicators = parse_indicators(&full_body).ok();
93
94    // 提取纯 SQL body(移除 indicators)
95    let body = if indicators.is_some() {
96        extract_sql_body(&full_body)
97    } else {
98        full_body
99    };
100
101    Ok(Sqllog {
102        ts: ts.to_string(),
103        meta,
104        body,
105        indicators,
106    })
107}
108
109/// 构建完整的 body(包含所有继续行)
110///
111/// 将第一行的 body 部分和所有继续行拼接成完整的 SQL 语句体。
112/// 使用预分配内存优化性能。
113///
114/// # 参数
115///
116/// * `first_line` - 起始行
117/// * `body_start` - body 在起始行中的起始位置
118/// * `continuation_lines` - 所有继续行
119///
120/// # 返回
121///
122/// 返回拼接后的完整 body 字符串
123#[inline]
124pub(crate) fn build_body(
125    first_line: &str,
126    body_start: usize,
127    continuation_lines: &[&str],
128) -> String {
129    if continuation_lines.is_empty() {
130        // 只有单行
131        if body_start < first_line.len() {
132            first_line[body_start..].to_string()
133        } else {
134            String::new()
135        }
136    } else {
137        // 有多行,计算总容量并预分配
138        let has_first_part = body_start < first_line.len();
139        let first_part_len = if has_first_part {
140            first_line.len() - body_start
141        } else {
142            0
143        };
144
145        let newline_count = if has_first_part {
146            continuation_lines.len()
147        } else {
148            continuation_lines.len() - 1
149        };
150
151        let total_len = first_part_len
152            + continuation_lines.iter().map(|s| s.len()).sum::<usize>()
153            + newline_count;
154
155        let mut result = String::with_capacity(total_len);
156
157        if has_first_part {
158            result.push_str(&first_line[body_start..]);
159            for line in continuation_lines {
160                result.push('\n');
161                result.push_str(line);
162            }
163        } else {
164            // 第一行为空,从第一个 continuation_line 开始
165            result.push_str(continuation_lines[0]);
166            for line in &continuation_lines[1..] {
167                result.push('\n');
168                result.push_str(line);
169            }
170        }
171
172        result
173    }
174}
175
176/// 从 full_body 中提取 SQL 部分(移除 indicators)
177#[inline]
178pub(crate) fn extract_sql_body(full_body: &str) -> String {
179    // 使用预定义的 INDICATOR_PATTERNS 避免每次创建数组
180    INDICATOR_PATTERNS
181        .iter()
182        .filter_map(|pattern| full_body.find(pattern))
183        .min()
184        .map(|pos| full_body[..pos].trim_end().to_string())
185        .unwrap_or_else(|| full_body.to_string())
186}
187
188/// 解析 meta 字符串
189pub(crate) fn parse_meta(meta_str: &str) -> Result<MetaParts, ParseError> {
190    // 使用前缀定位法而非简单分割,以正确处理 appname 中的空格
191
192    // 解析 EP - 从头开始
193    let ep_end = meta_str
194        .find(' ')
195        .ok_or(ParseError::InsufficientMetaFields {
196            count: 0,
197            raw: meta_str.to_string(),
198        })?;
199    let ep = parse_ep_field(&meta_str[..ep_end], meta_str)?;
200
201    // 解析 sess
202    let sess_start = ep_end + 1;
203    let sess_end = meta_str[sess_start..]
204        .find(' ')
205        .ok_or(ParseError::InsufficientMetaFields {
206            count: 1,
207            raw: meta_str.to_string(),
208        })?
209        + sess_start;
210    let sess_id = extract_field_value(&meta_str[sess_start..sess_end], SESS_PREFIX, meta_str)?;
211
212    // 解析 thrd
213    let thrd_start = sess_end + 1;
214    let thrd_end = meta_str[thrd_start..]
215        .find(' ')
216        .ok_or(ParseError::InsufficientMetaFields {
217            count: 2,
218            raw: meta_str.to_string(),
219        })?
220        + thrd_start;
221    let thrd_id = extract_field_value(&meta_str[thrd_start..thrd_end], THRD_PREFIX, meta_str)?;
222
223    // 解析 user
224    let user_start = thrd_end + 1;
225    let user_end = meta_str[user_start..]
226        .find(' ')
227        .ok_or(ParseError::InsufficientMetaFields {
228            count: 3,
229            raw: meta_str.to_string(),
230        })?
231        + user_start;
232    let username = extract_field_value(&meta_str[user_start..user_end], USER_PREFIX, meta_str)?;
233
234    // 解析 trxid
235    let trxid_start = user_end + 1;
236    let trxid_end_result = meta_str[trxid_start..].find(' ');
237    let (trxid, after_trxid) = if let Some(trxid_end_offset) = trxid_end_result {
238        let trxid_end = trxid_start + trxid_end_offset;
239        (
240            extract_field_value(&meta_str[trxid_start..trxid_end], TRXID_PREFIX, meta_str)?,
241            trxid_end + 1,
242        )
243    } else {
244        // 没有更多字段,trxid 是最后一个字段(只有 5 个字段)
245        (
246            extract_field_value(&meta_str[trxid_start..], TRXID_PREFIX, meta_str)?,
247            meta_str.len(),
248        )
249    };
250
251    // 如果只有 5 个字段,返回默认值
252    if after_trxid >= meta_str.len() {
253        return Ok(MetaParts {
254            ep,
255            sess_id,
256            thrd_id,
257            username,
258            trxid,
259            statement: String::new(),
260            appname: String::new(),
261            client_ip: String::new(),
262        });
263    }
264
265    // 解析 stmt(可能不存在)
266    let stmt_start = after_trxid;
267    let stmt_end_result = meta_str[stmt_start..].find(' ');
268    let (statement, after_stmt) = if let Some(stmt_end_offset) = stmt_end_result {
269        let stmt_end = stmt_start + stmt_end_offset;
270        (
271            extract_field_value(&meta_str[stmt_start..stmt_end], STMT_PREFIX, meta_str)?,
272            stmt_end + 1,
273        )
274    } else {
275        // 没有更多字段,stmt 是最后一个字段(只有 6 个字段)
276        (
277            extract_field_value(&meta_str[stmt_start..], STMT_PREFIX, meta_str)?,
278            meta_str.len(),
279        )
280    };
281
282    // 如果只有 6 个字段,返回默认 appname 和 client_ip
283    if after_stmt >= meta_str.len() {
284        return Ok(MetaParts {
285            ep,
286            sess_id,
287            thrd_id,
288            username,
289            trxid,
290            statement,
291            appname: String::new(),
292            client_ip: String::new(),
293        });
294    }
295
296    // 解析 appname(可选,且值可能包含空格)
297    let appname_start = after_stmt;
298    let (appname, client_ip) = if appname_start < meta_str.len() {
299        // 检查是否有 appname 字段
300        if meta_str[appname_start..].starts_with(APPNAME_PREFIX) {
301            // 找到 appname,需要确定其结束位置
302            // appname 后面可能跟着 " ip:::ffff:" 或者直接结束
303            let appname_value_start = appname_start + APPNAME_PREFIX.len();
304            if let Some(ip_pos) = meta_str[appname_value_start..].find(" ip:::ffff:") {
305                // 有 IP 字段
306                let appname_value = &meta_str[appname_value_start..appname_value_start + ip_pos];
307                let ip_start = appname_value_start + ip_pos + 1;
308                let client_ip = extract_field_value(&meta_str[ip_start..], IP_PREFIX, meta_str)?;
309                (appname_value.to_string(), client_ip)
310            } else {
311                // 没有 IP 字段,appname 到末尾
312                let appname_value = &meta_str[appname_value_start..];
313                (appname_value.to_string(), String::new())
314            }
315        } else {
316            // 没有 appname 字段
317            (String::new(), String::new())
318        }
319    } else {
320        (String::new(), String::new())
321    };
322
323    Ok(MetaParts {
324        ep,
325        sess_id,
326        thrd_id,
327        username,
328        trxid,
329        statement,
330        appname,
331        client_ip,
332    })
333}
334
335/// 解析 EP 字段
336#[inline]
337pub(crate) fn parse_ep_field(ep_str: &str, raw: &str) -> Result<u8, ParseError> {
338    if !ep_str.starts_with("EP[") || !ep_str.ends_with(']') {
339        return Err(ParseError::InvalidEpFormat {
340            value: ep_str.to_string(),
341            raw: raw.to_string(),
342        });
343    }
344
345    let ep_num = &ep_str[3..ep_str.len() - 1];
346    ep_num.parse::<u8>().map_err(|_| ParseError::EpParseError {
347        value: ep_num.to_string(),
348        raw: raw.to_string(),
349    })
350}
351
352/// 从字段中提取值
353#[inline]
354pub(crate) fn extract_field_value(
355    field: &str,
356    prefix: &str,
357    raw: &str,
358) -> Result<String, ParseError> {
359    field
360        .strip_prefix(prefix)
361        .map(|s| s.to_string())
362        .ok_or_else(|| ParseError::InvalidFieldFormat {
363            expected: prefix.to_string(),
364            actual: field.to_string(),
365            raw: raw.to_string(),
366        })
367}
368
369/// 解析 indicators 部分
370pub(crate) fn parse_indicators(body: &str) -> Result<IndicatorsParts, ParseError> {
371    // 使用预定义的静态常量,避免每次创建字符串
372    let exec_time_str = extract_indicator(body, EXECTIME_PREFIX, EXECTIME_SUFFIX)?;
373    let row_count_str = extract_indicator(body, ROWCOUNT_PREFIX, ROWCOUNT_SUFFIX)?;
374    let exec_id_str = extract_indicator(body, EXEC_ID_PREFIX, EXEC_ID_SUFFIX)?;
375
376    let execute_time =
377        exec_time_str
378            .parse::<f32>()
379            .map_err(|_| ParseError::IndicatorsParseError {
380                reason: format!("执行时间解析失败: {}", exec_time_str),
381                raw: body.to_string(),
382            })?;
383
384    let row_count = row_count_str
385        .parse::<u32>()
386        .map_err(|_| ParseError::IndicatorsParseError {
387            reason: format!("行数解析失败: {}", row_count_str),
388            raw: body.to_string(),
389        })?;
390
391    let execute_id = exec_id_str
392        .parse::<i64>()
393        .map_err(|_| ParseError::IndicatorsParseError {
394            reason: format!("执行 ID 解析失败: {}", exec_id_str),
395            raw: body.to_string(),
396        })?;
397
398    Ok(IndicatorsParts {
399        execute_time,
400        row_count,
401        execute_id,
402    })
403}
404
405/// 提取 indicator 值
406#[inline]
407pub(crate) fn extract_indicator<'a>(
408    text: &'a str,
409    prefix: &str,
410    suffix: &str,
411) -> Result<&'a str, ParseError> {
412    let start_pos = text
413        .find(prefix)
414        .ok_or_else(|| ParseError::IndicatorsParseError {
415            reason: format!("未找到 {}", prefix),
416            raw: text.to_string(),
417        })?
418        + prefix.len();
419
420    let remaining = &text[start_pos..];
421    let end_offset = remaining
422        .find(suffix)
423        .ok_or_else(|| ParseError::IndicatorsParseError {
424            reason: format!("未找到 {}", suffix),
425            raw: text.to_string(),
426        })?;
427
428    Ok(remaining[..end_offset].trim())
429}