dm_database_parser_sqllog/parser/
parse_functions.rs

1//! 核心解析函数
2//!
3//! 包含了所有用于解析 SQL 日志的核心函数,如解析记录、元数据、指标等。
4
5use crate::error::ParseError;
6use crate::parser::constants::*;
7use crate::sqllog::{IndicatorsParts, MetaParts, Sqllog};
8use crate::tools::is_record_start_line;
9
10/// 从行数组解析成 Sqllog 结构
11///
12/// 这是主要的解析函数,将一行或多行文本解析为结构化的 `Sqllog` 对象。
13///
14/// # 参数
15///
16/// * `lines` - 包含日志记录的行(第一行必须是有效的起始行,后续行是继续行)
17///
18/// # 返回
19///
20/// * `Ok(Sqllog)` - 解析成功
21/// * `Err(ParseError)` - 解析失败,包含详细的错误信息
22///
23/// # 错误
24///
25/// 可能返回以下错误:
26/// - `EmptyInput` - 输入为空
27/// - `InvalidRecordStartLine` - 第一行不是有效的记录起始行
28/// - `LineTooShort` - 行长度不足
29/// - `MissingClosingParen` - 缺少右括号
30/// - `InsufficientMetaFields` - Meta 字段数量不足
31///
32/// # 示例
33///
34/// ```
35/// use dm_database_parser_sqllog::parse_record;
36///
37/// let lines = vec!["2025-08-12 10:57:09.548 (EP[0] sess:123 thrd:456 user:alice trxid:789 stmt:999 appname:app) SELECT 1"];
38/// let sqllog = parse_record(&lines).unwrap();
39///
40/// assert_eq!(sqllog.ts, "2025-08-12 10:57:09.548");
41/// assert_eq!(sqllog.meta.username, "alice");
42/// assert_eq!(sqllog.body, "SELECT 1");
43/// ```
44pub fn parse_record(lines: &[&str]) -> Result<Sqllog, ParseError> {
45    if lines.is_empty() {
46        return Err(ParseError::EmptyInput);
47    }
48
49    let first_line = lines[0];
50
51    // 验证第一行格式
52    if !is_record_start_line(first_line) {
53        return Err(ParseError::InvalidRecordStartLine);
54    }
55
56    // 验证行长度
57    if first_line.len() < MIN_RECORD_LENGTH {
58        return Err(ParseError::LineTooShort(first_line.len()));
59    }
60
61    // 解析时间戳
62    let ts = &first_line[0..TIMESTAMP_LENGTH];
63
64    // 查找 meta 部分的右括号
65    let closing_paren = first_line
66        .find(')')
67        .ok_or(ParseError::MissingClosingParen)?;
68
69    if closing_paren <= META_START_INDEX {
70        return Err(ParseError::InsufficientMetaFields(0));
71    }
72
73    // 解析 meta 部分
74    let meta_str = &first_line[META_START_INDEX..closing_paren];
75    let meta = parse_meta(meta_str)?;
76
77    // 构建 body(包含继续行)
78    let body_start = closing_paren + BODY_OFFSET;
79    let full_body = build_body(first_line, body_start, &lines[1..]);
80
81    // 尝试解析 indicators(可选)
82    let indicators = parse_indicators(&full_body).ok();
83
84    // 提取纯 SQL body(移除 indicators)
85    let body = if indicators.is_some() {
86        extract_sql_body(&full_body)
87    } else {
88        full_body
89    };
90
91    Ok(Sqllog {
92        ts: ts.to_string(),
93        meta,
94        body,
95        indicators,
96    })
97}
98
99/// 构建完整的 body(包含所有继续行)
100///
101/// 将第一行的 body 部分和所有继续行拼接成完整的 SQL 语句体。
102/// 使用预分配内存优化性能。
103///
104/// # 参数
105///
106/// * `first_line` - 起始行
107/// * `body_start` - body 在起始行中的起始位置
108/// * `continuation_lines` - 所有继续行
109///
110/// # 返回
111///
112/// 返回拼接后的完整 body 字符串
113#[inline]
114pub(crate) fn build_body(
115    first_line: &str,
116    body_start: usize,
117    continuation_lines: &[&str],
118) -> String {
119    if continuation_lines.is_empty() {
120        // 只有单行
121        if body_start < first_line.len() {
122            first_line[body_start..].to_string()
123        } else {
124            String::new()
125        }
126    } else {
127        // 有多行,计算总容量并预分配
128        let has_first_part = body_start < first_line.len();
129        let first_part_len = if has_first_part {
130            first_line.len() - body_start
131        } else {
132            0
133        };
134
135        let newline_count = if has_first_part {
136            continuation_lines.len()
137        } else {
138            continuation_lines.len() - 1
139        };
140
141        let total_len = first_part_len
142            + continuation_lines.iter().map(|s| s.len()).sum::<usize>()
143            + newline_count;
144
145        let mut result = String::with_capacity(total_len);
146
147        if has_first_part {
148            result.push_str(&first_line[body_start..]);
149            for line in continuation_lines {
150                result.push('\n');
151                result.push_str(line);
152            }
153        } else {
154            // 第一行为空,从第一个 continuation_line 开始
155            result.push_str(continuation_lines[0]);
156            for line in &continuation_lines[1..] {
157                result.push('\n');
158                result.push_str(line);
159            }
160        }
161
162        result
163    }
164}
165
166/// 从 full_body 中提取 SQL 部分(移除 indicators)
167#[inline]
168pub(crate) fn extract_sql_body(full_body: &str) -> String {
169    // 使用预定义的 INDICATOR_PATTERNS 避免每次创建数组
170    INDICATOR_PATTERNS
171        .iter()
172        .filter_map(|pattern| full_body.find(pattern))
173        .min()
174        .map(|pos| full_body[..pos].trim_end().to_string())
175        .unwrap_or_else(|| full_body.to_string())
176}
177
178/// 解析 meta 字符串
179pub(crate) fn parse_meta(meta_str: &str) -> Result<MetaParts, ParseError> {
180    let fields: Vec<&str> = meta_str.split(' ').collect();
181
182    if fields.len() < 7 {
183        return Err(ParseError::InsufficientMetaFields(fields.len()));
184    }
185
186    // 解析 EP
187    let ep = parse_ep_field(fields[0])?;
188
189    // 解析必需字段 - 使用静态常量避免字符串字面量重复
190    let sess_id = extract_field_value(fields[1], SESS_PREFIX)?;
191    let thrd_id = extract_field_value(fields[2], THRD_PREFIX)?;
192    let username = extract_field_value(fields[3], USER_PREFIX)?;
193    let trxid = extract_field_value(fields[4], TRXID_PREFIX)?;
194    let statement = extract_field_value(fields[5], STMT_PREFIX)?;
195    let appname = extract_field_value(fields[6], APPNAME_PREFIX)?;
196
197    // 可选的 client_ip
198    let client_ip = fields
199        .get(7)
200        .map(|field| extract_field_value(field, IP_PREFIX))
201        .transpose()?
202        .unwrap_or_default();
203
204    Ok(MetaParts {
205        ep,
206        sess_id,
207        thrd_id,
208        username,
209        trxid,
210        statement,
211        appname,
212        client_ip,
213    })
214}
215
216/// 解析 EP 字段
217#[inline]
218pub(crate) fn parse_ep_field(ep_str: &str) -> Result<u8, ParseError> {
219    if !ep_str.starts_with("EP[") || !ep_str.ends_with(']') {
220        return Err(ParseError::InvalidEpFormat(ep_str.to_string()));
221    }
222
223    let ep_num = &ep_str[3..ep_str.len() - 1];
224    ep_num
225        .parse::<u8>()
226        .map_err(|_| ParseError::EpParseError(ep_num.to_string()))
227}
228
229/// 从字段中提取值
230#[inline]
231pub(crate) fn extract_field_value(field: &str, prefix: &str) -> Result<String, ParseError> {
232    field
233        .strip_prefix(prefix)
234        .map(|s| s.to_string())
235        .ok_or_else(|| ParseError::InvalidFieldFormat {
236            expected: prefix.to_string(),
237            actual: field.to_string(),
238        })
239}
240
241/// 解析 indicators 部分
242pub(crate) fn parse_indicators(body: &str) -> Result<IndicatorsParts, ParseError> {
243    // 使用预定义的静态常量,避免每次创建字符串
244    let exec_time_str = extract_indicator(body, EXECTIME_PREFIX, EXECTIME_SUFFIX)?;
245    let row_count_str = extract_indicator(body, ROWCOUNT_PREFIX, ROWCOUNT_SUFFIX)?;
246    let exec_id_str = extract_indicator(body, EXEC_ID_PREFIX, EXEC_ID_SUFFIX)?;
247
248    let execute_time = exec_time_str.parse::<f32>().map_err(|_| {
249        ParseError::IndicatorsParseError(format!("执行时间解析失败: {}", exec_time_str))
250    })?;
251
252    let row_count = row_count_str.parse::<u32>().map_err(|_| {
253        ParseError::IndicatorsParseError(format!("行数解析失败: {}", row_count_str))
254    })?;
255
256    let execute_id = exec_id_str.parse::<i64>().map_err(|_| {
257        ParseError::IndicatorsParseError(format!("执行 ID 解析失败: {}", exec_id_str))
258    })?;
259
260    Ok(IndicatorsParts {
261        execute_time,
262        row_count,
263        execute_id,
264    })
265}
266
267/// 提取 indicator 值
268#[inline]
269pub(crate) fn extract_indicator<'a>(
270    text: &'a str,
271    prefix: &str,
272    suffix: &str,
273) -> Result<&'a str, ParseError> {
274    let start_pos = text
275        .find(prefix)
276        .ok_or_else(|| ParseError::IndicatorsParseError(format!("未找到 {}", prefix)))?
277        + prefix.len();
278
279    let remaining = &text[start_pos..];
280    let end_offset = remaining
281        .find(suffix)
282        .ok_or_else(|| ParseError::IndicatorsParseError(format!("未找到 {}", suffix)))?;
283
284    Ok(remaining[..end_offset].trim())
285}