Skip to main content

dm_database_parser_sqllog/
sqllog.rs

1use atoi::atoi;
2use memchr::memchr;
3use memchr::memrchr;
4
5/// SQL 日志记录
6///
7/// 表示一条完整的 SQL 日志记录,所有字段在解析时一次性填充。
8#[derive(Debug, Clone, PartialEq, Default)]
9pub struct Sqllog {
10    /// 时间戳,格式为 "YYYY-MM-DD HH:MM:SS.mmm"
11    pub ts: String,
12
13    /// 方括号标签(例如 `[SEL]`、`[ORA]`),若无则为 None
14    pub tag: Option<String>,
15
16    // ── 元数据字段 ──
17    /// EP(Execution Point)编号,范围 0-255
18    pub ep: u8,
19
20    /// 会话 ID
21    pub sess_id: String,
22
23    /// 线程 ID
24    pub thrd_id: String,
25
26    /// 用户名
27    pub username: String,
28
29    /// 事务 ID
30    pub trxid: String,
31
32    /// 语句 ID
33    pub statement: String,
34
35    /// 应用程序名称
36    pub appname: String,
37
38    /// 客户端 IP 地址
39    pub client_ip: String,
40
41    // ── SQL 语句体 ──
42    /// SQL 语句体
43    pub sql: String,
44
45    // ── 性能指标 ──
46    /// 执行时间(毫秒),无指标时为 0.0
47    pub exectime: f32,
48
49    /// 影响的行数,无指标时为 0
50    pub rowcount: u32,
51
52    /// 执行 ID,无指标时为 0
53    pub exec_id: i64,
54}
55
56/// 解析元数据:从 meta 字节切片中提取所有字段。
57///
58/// meta_bytes 必须为有效 UTF-8。
59pub(crate) fn parse_meta_from_bytes(meta_bytes: &[u8]) -> (u8, String, String, String, String, String, String, String) {
60    let mut ep: u8 = 0;
61    let mut sess_id = String::new();
62    let mut thrd_id = String::new();
63    let mut username = String::new();
64    let mut trxid = String::new();
65    let mut statement = String::new();
66    let mut appname = String::new();
67    let mut client_ip = String::new();
68
69    let bytes = meta_bytes;
70    let len = bytes.len();
71    let mut idx = 0;
72
73    while idx < len {
74        // Skip whitespace
75        while idx < len && bytes[idx] == b' ' {
76            idx += 1;
77        }
78        if idx >= len {
79            break;
80        }
81
82        // Find token end
83        let start = idx;
84        while idx < len && bytes[idx] != b' ' {
85            idx += 1;
86        }
87        let part = &bytes[start..idx];
88
89        // Parse EP[n]
90        if part.len() > 4
91            && part[0] == b'E'
92            && part[1] == b'P'
93            && part[2] == b'['
94            && part[part.len() - 1] == b']'
95        {
96            if let Some(ep_val) = atoi::<u8>(&part[3..part.len() - 1]) {
97                ep = ep_val;
98            }
99            continue;
100        }
101
102        // Find ':'
103        if let Some(sep) = memchr(b':', part) {
104            let val_bytes = &part[sep + 1..];
105            let val = String::from_utf8_lossy(val_bytes).into_owned();
106
107            match &part[..sep] {
108                b"sess" => sess_id = val,
109                b"thrd" => thrd_id = val,
110                b"user" => username = val,
111                b"trxid" => trxid = val,
112                b"stmt" => statement = val,
113                b"ip" => client_ip = val,
114                b"appname" => {
115                    if !val_bytes.is_empty() {
116                        appname = val;
117                    } else {
118                        // Peek next token; treat it as appname only if it is not an ip field
119                        let mut peek = idx;
120                        while peek < len && bytes[peek] == b' ' {
121                            peek += 1;
122                        }
123                        if peek < len {
124                            let peek_start = peek;
125                            while peek < len && bytes[peek] != b' ' {
126                                peek += 1;
127                            }
128                            let next = &bytes[peek_start..peek];
129                            if !(next.starts_with(b"ip:") || next.starts_with(b"ip::")) {
130                                appname = String::from_utf8_lossy(next).into_owned();
131                                idx = peek;
132                            }
133                        }
134                    }
135                }
136                _ => {}
137            }
138        }
139    }
140
141    (ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip)
142}
143
144/// 解析性能指标:从 indicators 字节切片中提取 EXECTIME, ROWCOUNT, EXEC_ID。
145///
146/// 使用 memchr 扫描 ':' 和 '(' 定界符。
147pub(crate) fn parse_indicators_from_bytes(ind: &[u8]) -> (f32, u32, i64) {
148    if ind.is_empty() {
149        return (0.0, 0, 0);
150    }
151
152    let mut exectime: f32 = 0.0;
153    let mut rowcount: u32 = 0;
154    let mut exec_id: i64 = 0;
155
156    // Scan for EXECTIME
157    let mut search_start = 0;
158    while search_start < ind.len() {
159        if let Some(colon) = memchr(b':', &ind[search_start..]) {
160            let colon_pos = search_start + colon;
161            if colon_pos >= 8 && &ind[colon_pos - 8..colon_pos] == b"EXECTIME" {
162                let ss = colon_pos + 1;
163                if let Some(pi) = memchr(b'(', &ind[ss..]) {
164                    let val_bytes = &ind[ss..ss + pi];
165                    let val_str = String::from_utf8_lossy(val_bytes).trim_ascii().to_string();
166                    if let Ok(t) = val_str.parse::<f32>() {
167                        exectime = t;
168                    }
169                }
170                break;
171            }
172            search_start = colon_pos + 1;
173        } else {
174            break;
175        }
176    }
177
178    // Scan for ROWCOUNT
179    search_start = 0;
180    while search_start < ind.len() {
181        if let Some(colon) = memchr(b':', &ind[search_start..]) {
182            let colon_pos = search_start + colon;
183            if colon_pos >= 8 && &ind[colon_pos - 8..colon_pos] == b"ROWCOUNT" {
184                let ss = colon_pos + 1;
185                if let Some(pi) = memchr(b'(', &ind[ss..]) {
186                    let val_bytes = &ind[ss..ss + pi];
187                    let val_str = String::from_utf8_lossy(val_bytes).trim_ascii().to_string();
188                    if let Ok(r) = val_str.parse::<u32>() {
189                        rowcount = r;
190                    }
191                }
192                break;
193            }
194            search_start = colon_pos + 1;
195        } else {
196            break;
197        }
198    }
199
200    // Scan for EXEC_ID
201    search_start = 0;
202    while search_start < ind.len() {
203        if let Some(colon) = memchr(b':', &ind[search_start..]) {
204            let colon_pos = search_start + colon;
205            if colon_pos >= 7 && &ind[colon_pos - 7..colon_pos] == b"EXEC_ID" {
206                let ss = colon_pos + 1;
207                let end = memchr(b'.', &ind[ss..]).map(|i| ss + i).unwrap_or(ind.len());
208                let val_bytes = &ind[ss..end];
209                let val_str = String::from_utf8_lossy(val_bytes).trim_ascii().to_string();
210                if let Ok(id) = val_str.parse::<i64>() {
211                    exec_id = id;
212                }
213                break;
214            }
215            search_start = colon_pos + 1;
216        } else {
217            break;
218        }
219    }
220
221    (exectime, rowcount, exec_id)
222}
223
224/// 在 indicator 字节中查找分割点(body 结束、indicators 开始的位置)。
225///
226/// 返回 body 的字节长度。
227pub(crate) fn find_indicators_split(data: &[u8]) -> usize {
228    let len = data.len();
229
230    // 快速早退:末尾不是 '.' 或 ')' 则无指标。
231    let last_meaningful = data
232        .iter()
233        .rev()
234        .find(|&&b| b != b'\n' && b != b'\r')
235        .copied();
236    if last_meaningful != Some(b'.') && last_meaningful != Some(b')') {
237        return len;
238    }
239
240    // 在末尾 256 字节窗口内反向扫描 ':' 找指标关键字。
241    let window_start = len.saturating_sub(256);
242    let window = &data[window_start..];
243
244    let mut exectime_pos: Option<usize> = None;
245    let mut rowcount_pos: Option<usize> = None;
246    let mut exec_id_pos: Option<usize> = None;
247    let mut search_end = window.len();
248    while search_end > 0 {
249        if exectime_pos.is_some() && rowcount_pos.is_some() && exec_id_pos.is_some() {
250            break;
251        }
252        match memrchr(b':', &window[..search_end]) {
253            None => break,
254            Some(colon) => {
255                if exectime_pos.is_none() && colon >= 8 && &window[colon - 8..colon] == b"EXECTIME" {
256                    exectime_pos = Some(colon - 8);
257                } else if rowcount_pos.is_none() && colon >= 8 && &window[colon - 8..colon] == b"ROWCOUNT" {
258                    rowcount_pos = Some(colon - 8);
259                } else if exec_id_pos.is_none() && colon >= 7 && &window[colon - 7..colon] == b"EXEC_ID" {
260                    exec_id_pos = Some(colon - 7);
261                }
262                search_end = colon;
263            }
264        }
265    }
266
267    let earliest = [exectime_pos, rowcount_pos, exec_id_pos]
268        .into_iter()
269        .flatten()
270        .min();
271    match earliest {
272        Some(pos) => {
273            let split = window_start + pos;
274            // 验证守卫:假阳性时返回全文
275            let (_exectime, _rowcount, exec_id) = parse_indicators_from_bytes(&data[split..]);
276            if exec_id != 0 || _exectime != 0.0 || _rowcount != 0 {
277                split
278            } else {
279                len
280            }
281        }
282        None => len,
283    }
284}