Skip to main content

dm_database_parser_sqllog/
record.rs

1use atoi::atoi;
2use memchr::memchr;
3use memchr::memrchr;
4
5/// SQL 日志记录
6///
7/// 表示一条完整的 SQL 日志记录,所有字段在解析时一次性填充。
8#[derive(Debug, Clone, PartialEq, Default)]
9pub struct Sqllog {
10    /// 时间戳,格式为 "YYYY-MM-DD HH:MM:SS.mmm"
11    pub ts: String,
12
13    /// 方括号标签(例如 `[SEL]`、`[ORA]`),若无则为 None
14    pub tag: Option<String>,
15
16    // ── 元数据字段 ──
17    /// EP(Execution Point)编号,范围 0-255
18    pub ep: u8,
19
20    /// 会话 ID
21    pub sess_id: String,
22
23    /// 线程 ID
24    pub thrd_id: String,
25
26    /// 用户名
27    pub username: String,
28
29    /// 事务 ID
30    pub trxid: String,
31
32    /// 语句 ID
33    pub statement: String,
34
35    /// 应用程序名称
36    pub appname: String,
37
38    /// 客户端 IP 地址
39    pub client_ip: String,
40
41    // ── SQL 语句体 ──
42    /// SQL 语句体
43    pub sql: String,
44
45    // ── 性能指标 ──
46    /// 执行时间(毫秒),无指标时为 0.0
47    pub exectime: f32,
48
49    /// 影响的行数,无指标时为 0
50    pub rowcount: u32,
51
52    /// 执行 ID,无指标时为 0
53    pub exec_id: i64,
54}
55
56/// 解析元数据:从 meta 字节切片中提取所有字段。
57///
58/// meta_bytes 必须为有效 UTF-8。
59pub(crate) fn parse_meta_from_bytes(
60    meta_bytes: &[u8],
61) -> (u8, String, String, String, String, String, String, String) {
62    let mut ep: u8 = 0;
63    let mut sess_id = String::new();
64    let mut thrd_id = String::new();
65    let mut username = String::new();
66    let mut trxid = String::new();
67    let mut statement = String::new();
68    let mut appname = String::new();
69    let mut client_ip = String::new();
70
71    let bytes = meta_bytes;
72    let len = bytes.len();
73    let mut idx = 0;
74
75    while idx < len {
76        // Skip whitespace
77        while idx < len && bytes[idx] == b' ' {
78            idx += 1;
79        }
80        if idx >= len {
81            break;
82        }
83
84        // Find token end
85        let start = idx;
86        while idx < len && bytes[idx] != b' ' {
87            idx += 1;
88        }
89        let part = &bytes[start..idx];
90
91        // Parse EP[n]
92        if part.len() > 4
93            && part[0] == b'E'
94            && part[1] == b'P'
95            && part[2] == b'['
96            && part[part.len() - 1] == b']'
97        {
98            if let Some(ep_val) = atoi::<u8>(&part[3..part.len() - 1]) {
99                ep = ep_val;
100            }
101            continue;
102        }
103
104        // Find ':'
105        if let Some(sep) = memchr(b':', part) {
106            let val_bytes = &part[sep + 1..];
107            let val = String::from_utf8_lossy(val_bytes).into_owned();
108
109            match &part[..sep] {
110                b"sess" => sess_id = val,
111                b"thrd" => thrd_id = val,
112                b"user" => username = val,
113                b"trxid" => trxid = val,
114                b"stmt" => statement = val,
115                b"ip" => client_ip = val,
116                b"appname" => {
117                    if !val_bytes.is_empty() {
118                        appname = val;
119                    } else {
120                        // Peek next token; treat it as appname only if it is not an ip field
121                        let mut peek = idx;
122                        while peek < len && bytes[peek] == b' ' {
123                            peek += 1;
124                        }
125                        if peek < len {
126                            let peek_start = peek;
127                            while peek < len && bytes[peek] != b' ' {
128                                peek += 1;
129                            }
130                            let next = &bytes[peek_start..peek];
131                            if !(next.starts_with(b"ip:") || next.starts_with(b"ip::")) {
132                                appname = String::from_utf8_lossy(next).into_owned();
133                                idx = peek;
134                            }
135                        }
136                    }
137                }
138                _ => {}
139            }
140        }
141    }
142
143    (
144        ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip,
145    )
146}
147
148/// 解析性能指标:从 indicators 字节切片中提取 EXECTIME, ROWCOUNT, EXEC_ID。
149///
150/// 使用 memchr 扫描 ':' 和 '(' 定界符。
151pub(crate) fn parse_indicators_from_bytes(ind: &[u8]) -> (f32, u32, i64) {
152    if ind.is_empty() {
153        return (0.0, 0, 0);
154    }
155
156    let mut exectime: f32 = 0.0;
157    let mut rowcount: u32 = 0;
158    let mut exec_id: i64 = 0;
159
160    // Scan for EXECTIME
161    let mut search_start = 0;
162    while search_start < ind.len() {
163        if let Some(colon) = memchr(b':', &ind[search_start..]) {
164            let colon_pos = search_start + colon;
165            if colon_pos >= 8 && &ind[colon_pos - 8..colon_pos] == b"EXECTIME" {
166                let ss = colon_pos + 1;
167                if let Some(pi) = memchr(b'(', &ind[ss..]) {
168                    let val_bytes = &ind[ss..ss + pi];
169                    let val_str = String::from_utf8_lossy(val_bytes).trim_ascii().to_string();
170                    if let Ok(t) = val_str.parse::<f32>() {
171                        exectime = t;
172                    }
173                }
174                break;
175            }
176            search_start = colon_pos + 1;
177        } else {
178            break;
179        }
180    }
181
182    // Scan for ROWCOUNT
183    search_start = 0;
184    while search_start < ind.len() {
185        if let Some(colon) = memchr(b':', &ind[search_start..]) {
186            let colon_pos = search_start + colon;
187            if colon_pos >= 8 && &ind[colon_pos - 8..colon_pos] == b"ROWCOUNT" {
188                let ss = colon_pos + 1;
189                if let Some(pi) = memchr(b'(', &ind[ss..]) {
190                    let val_bytes = &ind[ss..ss + pi];
191                    let val_str = String::from_utf8_lossy(val_bytes).trim_ascii().to_string();
192                    if let Ok(r) = val_str.parse::<u32>() {
193                        rowcount = r;
194                    }
195                }
196                break;
197            }
198            search_start = colon_pos + 1;
199        } else {
200            break;
201        }
202    }
203
204    // Scan for EXEC_ID
205    search_start = 0;
206    while search_start < ind.len() {
207        if let Some(colon) = memchr(b':', &ind[search_start..]) {
208            let colon_pos = search_start + colon;
209            if colon_pos >= 7 && &ind[colon_pos - 7..colon_pos] == b"EXEC_ID" {
210                let ss = colon_pos + 1;
211                let end = memchr(b'.', &ind[ss..])
212                    .map(|i| ss + i)
213                    .unwrap_or(ind.len());
214                let val_bytes = &ind[ss..end];
215                let val_str = String::from_utf8_lossy(val_bytes).trim_ascii().to_string();
216                if let Ok(id) = val_str.parse::<i64>() {
217                    exec_id = id;
218                }
219                break;
220            }
221            search_start = colon_pos + 1;
222        } else {
223            break;
224        }
225    }
226
227    (exectime, rowcount, exec_id)
228}
229
230/// 在 indicator 字节中查找分割点(body 结束、indicators 开始的位置)。
231///
232/// 返回 body 的字节长度。
233pub(crate) fn find_indicators_split(data: &[u8]) -> usize {
234    let len = data.len();
235
236    // 快速早退:末尾不是 '.' 或 ')' 则无指标。
237    let last_meaningful = data
238        .iter()
239        .rev()
240        .find(|&&b| b != b'\n' && b != b'\r')
241        .copied();
242    if last_meaningful != Some(b'.') && last_meaningful != Some(b')') {
243        return len;
244    }
245
246    // 在末尾 256 字节窗口内反向扫描 ':' 找指标关键字。
247    let window_start = len.saturating_sub(256);
248    let window = &data[window_start..];
249
250    let mut exectime_pos: Option<usize> = None;
251    let mut rowcount_pos: Option<usize> = None;
252    let mut exec_id_pos: Option<usize> = None;
253    let mut search_end = window.len();
254    while search_end > 0 {
255        if exectime_pos.is_some() && rowcount_pos.is_some() && exec_id_pos.is_some() {
256            break;
257        }
258        match memrchr(b':', &window[..search_end]) {
259            None => break,
260            Some(colon) => {
261                if exectime_pos.is_none() && colon >= 8 && &window[colon - 8..colon] == b"EXECTIME"
262                {
263                    exectime_pos = Some(colon - 8);
264                } else if rowcount_pos.is_none()
265                    && colon >= 8
266                    && &window[colon - 8..colon] == b"ROWCOUNT"
267                {
268                    rowcount_pos = Some(colon - 8);
269                } else if exec_id_pos.is_none()
270                    && colon >= 7
271                    && &window[colon - 7..colon] == b"EXEC_ID"
272                {
273                    exec_id_pos = Some(colon - 7);
274                }
275                search_end = colon;
276            }
277        }
278    }
279
280    let earliest = [exectime_pos, rowcount_pos, exec_id_pos]
281        .into_iter()
282        .flatten()
283        .min();
284    match earliest {
285        Some(pos) => {
286            let split = window_start + pos;
287            // 验证守卫:假阳性时返回全文
288            let (_exectime, _rowcount, exec_id) = parse_indicators_from_bytes(&data[split..]);
289            if exec_id != 0 || _exectime != 0.0 || _rowcount != 0 {
290                split
291            } else {
292                len
293            }
294        }
295        None => len,
296    }
297}