Skip to main content

dm_database_sqllog2db/stats/
aggregate.rs

1//! 统计聚合层:慢 SQL 最小堆 + 高频 SQL `HashMap`,支持单次扫描双侧聚合。
2
3use std::cmp::{Ordering, Reverse};
4use std::collections::{BinaryHeap, HashMap};
5
6/// 慢 SQL 输出行(写入 CSV 或 `SQLite`)
7#[derive(Debug, Clone)]
8pub struct SlowSqlRow {
9    pub sql_text: String,
10    pub elapsed_ms: i64,
11    pub timestamp: String,
12}
13
14/// 高频 SQL 输出行(写入 CSV 或 `SQLite`)
15#[derive(Debug, Clone)]
16pub struct FrequentSqlRow {
17    pub normalized_sql: String,
18    pub call_count: u64,
19    pub avg_elapsed_ms: i64,
20    pub max_elapsed_ms: i64,
21}
22
23/// 慢 SQL 堆条目(内部使用,`f32` 保留精度,写出时用 `f32_ms_to_i64` 转换)
24#[derive(Debug)]
25struct SlowSqlEntry {
26    sql_text: String,
27    elapsed_ms: f32,
28    timestamp: String,
29}
30
31impl PartialEq for SlowSqlEntry {
32    fn eq(&self, other: &Self) -> bool {
33        self.elapsed_ms.total_cmp(&other.elapsed_ms) == Ordering::Equal
34    }
35}
36
37impl Eq for SlowSqlEntry {}
38
39impl PartialOrd for SlowSqlEntry {
40    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
41        Some(self.cmp(other))
42    }
43}
44
45impl Ord for SlowSqlEntry {
46    fn cmp(&self, other: &Self) -> Ordering {
47        self.elapsed_ms.total_cmp(&other.elapsed_ms)
48    }
49}
50
51/// 高频 SQL 聚合状态(`HashMap` value)
52#[derive(Debug)]
53struct AggState {
54    call_count: u64,
55    total_elapsed: f64,
56    max_elapsed: f32,
57}
58
59/// 统计聚合器:单次扫描同时聚合慢 SQL(最小堆)与高频 SQL(`HashMap`)。
60#[derive(Debug)]
61pub struct StatsAccumulator {
62    slow_heap: BinaryHeap<Reverse<SlowSqlEntry>>,
63    freq_map: HashMap<String, AggState>,
64    top_n: usize,
65    from: Option<String>,
66    to: Option<String>,
67}
68
69impl StatsAccumulator {
70    /// 创建新的聚合器,`top_n` 为输出行数上限(≥ 1),`from`/`to` 为时间段过滤范围(均 None 时不过滤)。
71    ///
72    /// # Panics
73    ///
74    /// 当 `top_n` 为 0 时 panic。
75    #[must_use]
76    pub fn new(top_n: u32, from: Option<String>, to: Option<String>) -> Self {
77        assert!(top_n >= 1, "top_n must be >= 1");
78        Self {
79            slow_heap: BinaryHeap::new(),
80            freq_map: HashMap::new(),
81            top_n: top_n as usize,
82            from,
83            to,
84        }
85    }
86
87    /// 处理单条日志记录,同时更新慢 SQL 堆与高频 SQL 映射。
88    pub fn update(&mut self, record: &dm_database_parser_sqllog::Sqllog) {
89        if !self.in_range(&record.ts) {
90            return;
91        }
92        let slow_entry = SlowSqlEntry {
93            sql_text: record.sql.clone(),
94            elapsed_ms: record.exectime,
95            timestamp: record.ts.clone(),
96        };
97        self.push_slow(slow_entry);
98
99        let normalized_key = crate::stats::normalize::normalize_sql(&record.sql);
100        let freq_state = self.freq_map.entry(normalized_key).or_insert(AggState {
101            call_count: 0,
102            total_elapsed: 0.0,
103            max_elapsed: f32::NEG_INFINITY,
104        });
105        freq_state.call_count += 1;
106        freq_state.total_elapsed += f64::from(record.exectime);
107        if record.exectime > freq_state.max_elapsed {
108            freq_state.max_elapsed = record.exectime;
109        }
110    }
111
112    /// 检查 `ts` 是否在 `[from, to]` 时间范围内(字符串前缀截取比较,零分配)。
113    fn in_range(&self, ts: &str) -> bool {
114        if let Some(from) = &self.from {
115            if ts.len() < from.len() {
116                return false;
117            }
118            if &ts[..from.len()] < from.as_str() {
119                return false;
120            }
121        }
122        if let Some(to) = &self.to {
123            if ts.len() < to.len() {
124                return false;
125            }
126            if &ts[..to.len()] > to.as_str() {
127                return false;
128            }
129        }
130        true
131    }
132
133    /// 推入慢 SQL 条目到最小堆,维护堆大小 ≤ `top_n`。
134    fn push_slow(&mut self, entry: SlowSqlEntry) {
135        if self.slow_heap.len() < self.top_n {
136            self.slow_heap.push(Reverse(entry));
137            return;
138        }
139        if let Some(Reverse(heap_top)) = self.slow_heap.peek() {
140            if entry.elapsed_ms.total_cmp(&heap_top.elapsed_ms) == Ordering::Greater {
141                self.slow_heap.pop();
142                self.slow_heap.push(Reverse(entry));
143            }
144        }
145    }
146
147    /// 消费聚合器,返回慢 SQL 行(按 elapsed 降序)与高频 SQL 行(按 `call_count` 降序)。
148    #[must_use]
149    pub fn into_results(self) -> (Vec<SlowSqlRow>, Vec<FrequentSqlRow>) {
150        let slow_rows = build_slow_rows(self.slow_heap);
151        let freq_rows = build_freq_rows(self.freq_map, self.top_n);
152        (slow_rows, freq_rows)
153    }
154}
155
156/// 从堆中提取并排序慢 SQL 行(降序)。
157fn build_slow_rows(heap: BinaryHeap<Reverse<SlowSqlEntry>>) -> Vec<SlowSqlRow> {
158    let mut rows: Vec<SlowSqlRow> = heap
159        .into_iter()
160        .map(|Reverse(entry)| SlowSqlRow {
161            sql_text: entry.sql_text,
162            elapsed_ms: crate::exporter::f32_ms_to_i64(entry.elapsed_ms),
163            timestamp: entry.timestamp,
164        })
165        .collect();
166    rows.sort_by_key(|row| std::cmp::Reverse(row.elapsed_ms));
167    rows
168}
169
170/// 从 `HashMap` 构建高频 SQL 行,排序并截断到 `top_n`。
171fn build_freq_rows(freq_map: HashMap<String, AggState>, top_n: usize) -> Vec<FrequentSqlRow> {
172    let mut rows: Vec<FrequentSqlRow> = freq_map
173        .into_iter()
174        .map(|(normalized_sql, state)| {
175            // f64 -> f32 精度在此为有意舍入(ms 级精度已足够),抑制 clippy 截断警告
176            #[expect(
177                clippy::cast_precision_loss,
178                reason = "call_count as f64 may lose precision for large counts; acceptable for stats use"
179            )]
180            #[expect(
181                clippy::cast_possible_truncation,
182                reason = "avg in ms fits f32 range for typical SQL elapsed times"
183            )]
184            let avg_f32 = (state.total_elapsed / state.call_count as f64) as f32;
185            FrequentSqlRow {
186                normalized_sql,
187                call_count: state.call_count,
188                avg_elapsed_ms: crate::exporter::f32_ms_to_i64(avg_f32),
189                max_elapsed_ms: crate::exporter::f32_ms_to_i64(state.max_elapsed),
190            }
191        })
192        .collect();
193    rows.sort_by_key(|row| std::cmp::Reverse(row.call_count));
194    rows.truncate(top_n);
195    rows
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use dm_database_parser_sqllog::Sqllog;
202
203    fn make_record(sql: &str, exectime: f32, ts: &str) -> Sqllog {
204        Sqllog {
205            sql: sql.to_string(),
206            exectime,
207            ts: ts.to_string(),
208            ..Sqllog::default()
209        }
210    }
211
212    #[test]
213    fn test_slow_sql_top_n_limit() {
214        let mut acc = StatsAccumulator::new(3, None, None);
215        acc.update(&make_record("SELECT 1", 10.0, "2025-01-01"));
216        acc.update(&make_record("SELECT 2", 50.0, "2025-01-02"));
217        acc.update(&make_record("SELECT 3", 30.0, "2025-01-03"));
218        acc.update(&make_record("SELECT 4", 20.0, "2025-01-04"));
219        acc.update(&make_record("SELECT 5", 40.0, "2025-01-05"));
220
221        let (slow, _) = acc.into_results();
222        assert_eq!(slow.len(), 3);
223        // 降序:50, 40, 30
224        assert_eq!(slow[0].elapsed_ms, 50);
225        assert_eq!(slow[1].elapsed_ms, 40);
226        assert_eq!(slow[2].elapsed_ms, 30);
227    }
228
229    #[test]
230    fn test_slow_sql_includes_zero_and_negative_elapsed() {
231        let mut acc = StatsAccumulator::new(5, None, None);
232        acc.update(&make_record("SELECT A", 0.0, "2025-01-01"));
233        acc.update(&make_record("SELECT B", -1.0, "2025-01-02"));
234        acc.update(&make_record("SELECT C", 5.0, "2025-01-03"));
235
236        let (slow, _) = acc.into_results();
237        assert_eq!(slow.len(), 3, "all 3 records should be included (D-12)");
238    }
239
240    #[test]
241    fn test_frequent_sql_aggregation() {
242        let mut acc = StatsAccumulator::new(10, None, None);
243        // 同一模板 3 条
244        acc.update(&make_record(
245            "SELECT id FROM t WHERE id = 1",
246            1.0,
247            "2025-01-01",
248        ));
249        acc.update(&make_record(
250            "SELECT id FROM t WHERE id = 2",
251            2.0,
252            "2025-01-02",
253        ));
254        acc.update(&make_record(
255            "SELECT id FROM t WHERE id = 3",
256            3.0,
257            "2025-01-03",
258        ));
259        // 不同模板 1 条
260        acc.update(&make_record("INSERT INTO t VALUES (1)", 5.0, "2025-01-04"));
261
262        let (_, frequent) = acc.into_results();
263        // 找到 call_count == 3 的那条
264        let target = frequent.iter().find(|r| r.call_count == 3);
265        assert!(target.is_some(), "should find 3-call entry");
266        let target = target.unwrap();
267        assert_eq!(target.avg_elapsed_ms, 2, "avg of 1+2+3 = 2ms");
268        assert_eq!(target.max_elapsed_ms, 3, "max of 1,2,3 = 3ms");
269    }
270
271    #[test]
272    fn test_frequent_sql_top_n_limit_and_sort() {
273        let mut acc = StatsAccumulator::new(3, None, None);
274        for count in 1..=5u64 {
275            let sql = format!("SELECT * FROM t{count}");
276            for _ in 0..count {
277                acc.update(&make_record(&sql, 1.0, "2025-01-01"));
278            }
279        }
280
281        let (_, frequent) = acc.into_results();
282        assert_eq!(frequent.len(), 3);
283        assert_eq!(frequent[0].call_count, 5);
284        assert_eq!(frequent[1].call_count, 4);
285        assert_eq!(frequent[2].call_count, 3);
286    }
287
288    #[test]
289    fn test_slow_entry_total_cmp_handles_equal_elapsed() {
290        let mut acc = StatsAccumulator::new(1, None, None);
291        acc.update(&make_record("SELECT X", 5.0, "2025-01-01"));
292        acc.update(&make_record("SELECT Y", 5.0, "2025-01-02"));
293        // 不应 panic,结果稳定
294        let (slow, _) = acc.into_results();
295        assert_eq!(slow.len(), 1);
296    }
297
298    #[test]
299    fn test_into_results_when_records_fewer_than_top_n() {
300        let mut acc = StatsAccumulator::new(5, None, None);
301        // 使用结构不同的 SQL,normalize_sql 后 key 不同,保证 frequent 有 2 条
302        acc.update(&make_record("SELECT id FROM users", 10.0, "2025-01-01"));
303        acc.update(&make_record(
304            "INSERT INTO orders VALUES (1)",
305            20.0,
306            "2025-01-02",
307        ));
308
309        let (slow, frequent) = acc.into_results();
310        assert_eq!(slow.len(), 2, "D-11: output only actual count");
311        assert_eq!(frequent.len(), 2, "D-11: output only actual count");
312    }
313
314    #[test]
315    fn test_filter_both_from_and_to_excludes_outside_records() {
316        let mut acc = StatsAccumulator::new(
317            10,
318            Some("2024-01-15".to_string()),
319            Some("2024-01-15".to_string()),
320        );
321        acc.update(&make_record("SELECT 1", 1.0, "2024-01-14 10:00:00"));
322        acc.update(&make_record("SELECT 2", 2.0, "2024-01-15 00:00:00"));
323        acc.update(&make_record("SELECT 3", 3.0, "2024-01-15 23:59:59"));
324        acc.update(&make_record("SELECT 4", 4.0, "2024-01-16 10:00:00"));
325        let (slow, _) = acc.into_results();
326        assert_eq!(
327            slow.len(),
328            2,
329            "only records on 2024-01-15 should be included"
330        );
331    }
332
333    #[test]
334    fn test_filter_from_only_excludes_earlier_records() {
335        let mut acc = StatsAccumulator::new(10, Some("2024-01-15".to_string()), None);
336        acc.update(&make_record("SELECT A", 1.0, "2024-01-14"));
337        acc.update(&make_record("SELECT B", 2.0, "2024-01-15"));
338        acc.update(&make_record("SELECT C", 3.0, "2024-01-20"));
339        let (slow, _) = acc.into_results();
340        assert_eq!(
341            slow.len(),
342            2,
343            "records on/after 2024-01-15 should be included"
344        );
345    }
346
347    #[test]
348    fn test_filter_to_only_excludes_later_records() {
349        let mut acc = StatsAccumulator::new(10, None, Some("2024-01-15".to_string()));
350        acc.update(&make_record("SELECT A", 1.0, "2024-01-10"));
351        acc.update(&make_record("SELECT B", 2.0, "2024-01-15"));
352        acc.update(&make_record("SELECT C", 3.0, "2024-01-16"));
353        let (slow, _) = acc.into_results();
354        assert_eq!(
355            slow.len(),
356            2,
357            "records on/before 2024-01-15 should be included"
358        );
359    }
360
361    #[test]
362    fn test_filter_none_behavior_unchanged() {
363        let mut acc = StatsAccumulator::new(3, None, None);
364        acc.update(&make_record("SELECT 1", 10.0, "2025-01-01"));
365        acc.update(&make_record("SELECT 2", 50.0, "2025-01-02"));
366        acc.update(&make_record("SELECT 3", 30.0, "2025-01-03"));
367        acc.update(&make_record("SELECT 4", 20.0, "2025-01-04"));
368        acc.update(&make_record("SELECT 5", 40.0, "2025-01-05"));
369        let (slow, _) = acc.into_results();
370        assert_eq!(slow.len(), 3);
371        assert_eq!(slow[0].elapsed_ms, 50);
372        assert_eq!(slow[1].elapsed_ms, 40);
373        assert_eq!(slow[2].elapsed_ms, 30);
374    }
375
376    #[test]
377    fn test_filter_ts_too_short_treated_as_out_of_range() {
378        let mut acc = StatsAccumulator::new(10, Some("2024-01-15 10:00:00".to_string()), None);
379        // ts is only 10 bytes, from is 19 bytes — length guard must fire
380        acc.update(&make_record("SELECT X", 1.0, "2024-01-15"));
381        let (slow, _) = acc.into_results();
382        assert_eq!(
383            slow.len(),
384            0,
385            "ts too short should be treated as out of range"
386        );
387    }
388}