1use std::cmp::{Ordering, Reverse};
4use std::collections::{BinaryHeap, HashMap};
5
6#[derive(Debug, Clone)]
8pub struct SlowSqlRow {
9 pub sql_text: String,
10 pub elapsed_ms: i64,
11 pub timestamp: String,
12}
13
14#[derive(Debug, Clone)]
16pub struct FrequentSqlRow {
17 pub normalized_sql: String,
18 pub call_count: u64,
19 pub avg_elapsed_ms: i64,
20 pub max_elapsed_ms: i64,
21}
22
23#[derive(Debug)]
25struct SlowSqlEntry {
26 sql_text: String,
27 elapsed_ms: f32,
28 timestamp: String,
29}
30
31impl PartialEq for SlowSqlEntry {
32 fn eq(&self, other: &Self) -> bool {
33 self.elapsed_ms.total_cmp(&other.elapsed_ms) == Ordering::Equal
34 }
35}
36
37impl Eq for SlowSqlEntry {}
38
39impl PartialOrd for SlowSqlEntry {
40 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
41 Some(self.cmp(other))
42 }
43}
44
45impl Ord for SlowSqlEntry {
46 fn cmp(&self, other: &Self) -> Ordering {
47 self.elapsed_ms.total_cmp(&other.elapsed_ms)
48 }
49}
50
51#[derive(Debug)]
53struct AggState {
54 call_count: u64,
55 total_elapsed: f64,
56 max_elapsed: f32,
57}
58
59#[derive(Debug)]
61pub struct StatsAccumulator {
62 slow_heap: BinaryHeap<Reverse<SlowSqlEntry>>,
63 freq_map: HashMap<String, AggState>,
64 top_n: usize,
65 from: Option<String>,
66 to: Option<String>,
67}
68
69impl StatsAccumulator {
70 #[must_use]
76 pub fn new(top_n: u32, from: Option<String>, to: Option<String>) -> Self {
77 assert!(top_n >= 1, "top_n must be >= 1");
78 Self {
79 slow_heap: BinaryHeap::new(),
80 freq_map: HashMap::new(),
81 top_n: top_n as usize,
82 from,
83 to,
84 }
85 }
86
87 pub fn update(&mut self, record: &dm_database_parser_sqllog::Sqllog) {
89 if !self.in_range(&record.ts) {
90 return;
91 }
92 let slow_entry = SlowSqlEntry {
93 sql_text: record.sql.clone(),
94 elapsed_ms: record.exectime,
95 timestamp: record.ts.clone(),
96 };
97 self.push_slow(slow_entry);
98
99 let normalized_key = crate::stats::normalize::normalize_sql(&record.sql);
100 let freq_state = self.freq_map.entry(normalized_key).or_insert(AggState {
101 call_count: 0,
102 total_elapsed: 0.0,
103 max_elapsed: f32::NEG_INFINITY,
104 });
105 freq_state.call_count += 1;
106 freq_state.total_elapsed += f64::from(record.exectime);
107 if record.exectime > freq_state.max_elapsed {
108 freq_state.max_elapsed = record.exectime;
109 }
110 }
111
112 fn in_range(&self, ts: &str) -> bool {
114 if let Some(from) = &self.from {
115 if ts.len() < from.len() {
116 return false;
117 }
118 if &ts[..from.len()] < from.as_str() {
119 return false;
120 }
121 }
122 if let Some(to) = &self.to {
123 if ts.len() < to.len() {
124 return false;
125 }
126 if &ts[..to.len()] > to.as_str() {
127 return false;
128 }
129 }
130 true
131 }
132
133 fn push_slow(&mut self, entry: SlowSqlEntry) {
135 if self.slow_heap.len() < self.top_n {
136 self.slow_heap.push(Reverse(entry));
137 return;
138 }
139 if let Some(Reverse(heap_top)) = self.slow_heap.peek() {
140 if entry.elapsed_ms.total_cmp(&heap_top.elapsed_ms) == Ordering::Greater {
141 self.slow_heap.pop();
142 self.slow_heap.push(Reverse(entry));
143 }
144 }
145 }
146
147 #[must_use]
149 pub fn into_results(self) -> (Vec<SlowSqlRow>, Vec<FrequentSqlRow>) {
150 let slow_rows = build_slow_rows(self.slow_heap);
151 let freq_rows = build_freq_rows(self.freq_map, self.top_n);
152 (slow_rows, freq_rows)
153 }
154}
155
156fn build_slow_rows(heap: BinaryHeap<Reverse<SlowSqlEntry>>) -> Vec<SlowSqlRow> {
158 let mut rows: Vec<SlowSqlRow> = heap
159 .into_iter()
160 .map(|Reverse(entry)| SlowSqlRow {
161 sql_text: entry.sql_text,
162 elapsed_ms: crate::exporter::f32_ms_to_i64(entry.elapsed_ms),
163 timestamp: entry.timestamp,
164 })
165 .collect();
166 rows.sort_by_key(|row| std::cmp::Reverse(row.elapsed_ms));
167 rows
168}
169
170fn build_freq_rows(freq_map: HashMap<String, AggState>, top_n: usize) -> Vec<FrequentSqlRow> {
172 let mut rows: Vec<FrequentSqlRow> = freq_map
173 .into_iter()
174 .map(|(normalized_sql, state)| {
175 #[expect(
177 clippy::cast_precision_loss,
178 reason = "call_count as f64 may lose precision for large counts; acceptable for stats use"
179 )]
180 #[expect(
181 clippy::cast_possible_truncation,
182 reason = "avg in ms fits f32 range for typical SQL elapsed times"
183 )]
184 let avg_f32 = (state.total_elapsed / state.call_count as f64) as f32;
185 FrequentSqlRow {
186 normalized_sql,
187 call_count: state.call_count,
188 avg_elapsed_ms: crate::exporter::f32_ms_to_i64(avg_f32),
189 max_elapsed_ms: crate::exporter::f32_ms_to_i64(state.max_elapsed),
190 }
191 })
192 .collect();
193 rows.sort_by_key(|row| std::cmp::Reverse(row.call_count));
194 rows.truncate(top_n);
195 rows
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use dm_database_parser_sqllog::Sqllog;
202
203 fn make_record(sql: &str, exectime: f32, ts: &str) -> Sqllog {
204 Sqllog {
205 sql: sql.to_string(),
206 exectime,
207 ts: ts.to_string(),
208 ..Sqllog::default()
209 }
210 }
211
212 #[test]
213 fn test_slow_sql_top_n_limit() {
214 let mut acc = StatsAccumulator::new(3, None, None);
215 acc.update(&make_record("SELECT 1", 10.0, "2025-01-01"));
216 acc.update(&make_record("SELECT 2", 50.0, "2025-01-02"));
217 acc.update(&make_record("SELECT 3", 30.0, "2025-01-03"));
218 acc.update(&make_record("SELECT 4", 20.0, "2025-01-04"));
219 acc.update(&make_record("SELECT 5", 40.0, "2025-01-05"));
220
221 let (slow, _) = acc.into_results();
222 assert_eq!(slow.len(), 3);
223 assert_eq!(slow[0].elapsed_ms, 50);
225 assert_eq!(slow[1].elapsed_ms, 40);
226 assert_eq!(slow[2].elapsed_ms, 30);
227 }
228
229 #[test]
230 fn test_slow_sql_includes_zero_and_negative_elapsed() {
231 let mut acc = StatsAccumulator::new(5, None, None);
232 acc.update(&make_record("SELECT A", 0.0, "2025-01-01"));
233 acc.update(&make_record("SELECT B", -1.0, "2025-01-02"));
234 acc.update(&make_record("SELECT C", 5.0, "2025-01-03"));
235
236 let (slow, _) = acc.into_results();
237 assert_eq!(slow.len(), 3, "all 3 records should be included (D-12)");
238 }
239
240 #[test]
241 fn test_frequent_sql_aggregation() {
242 let mut acc = StatsAccumulator::new(10, None, None);
243 acc.update(&make_record(
245 "SELECT id FROM t WHERE id = 1",
246 1.0,
247 "2025-01-01",
248 ));
249 acc.update(&make_record(
250 "SELECT id FROM t WHERE id = 2",
251 2.0,
252 "2025-01-02",
253 ));
254 acc.update(&make_record(
255 "SELECT id FROM t WHERE id = 3",
256 3.0,
257 "2025-01-03",
258 ));
259 acc.update(&make_record("INSERT INTO t VALUES (1)", 5.0, "2025-01-04"));
261
262 let (_, frequent) = acc.into_results();
263 let target = frequent.iter().find(|r| r.call_count == 3);
265 assert!(target.is_some(), "should find 3-call entry");
266 let target = target.unwrap();
267 assert_eq!(target.avg_elapsed_ms, 2, "avg of 1+2+3 = 2ms");
268 assert_eq!(target.max_elapsed_ms, 3, "max of 1,2,3 = 3ms");
269 }
270
271 #[test]
272 fn test_frequent_sql_top_n_limit_and_sort() {
273 let mut acc = StatsAccumulator::new(3, None, None);
274 for count in 1..=5u64 {
275 let sql = format!("SELECT * FROM t{count}");
276 for _ in 0..count {
277 acc.update(&make_record(&sql, 1.0, "2025-01-01"));
278 }
279 }
280
281 let (_, frequent) = acc.into_results();
282 assert_eq!(frequent.len(), 3);
283 assert_eq!(frequent[0].call_count, 5);
284 assert_eq!(frequent[1].call_count, 4);
285 assert_eq!(frequent[2].call_count, 3);
286 }
287
288 #[test]
289 fn test_slow_entry_total_cmp_handles_equal_elapsed() {
290 let mut acc = StatsAccumulator::new(1, None, None);
291 acc.update(&make_record("SELECT X", 5.0, "2025-01-01"));
292 acc.update(&make_record("SELECT Y", 5.0, "2025-01-02"));
293 let (slow, _) = acc.into_results();
295 assert_eq!(slow.len(), 1);
296 }
297
298 #[test]
299 fn test_into_results_when_records_fewer_than_top_n() {
300 let mut acc = StatsAccumulator::new(5, None, None);
301 acc.update(&make_record("SELECT id FROM users", 10.0, "2025-01-01"));
303 acc.update(&make_record(
304 "INSERT INTO orders VALUES (1)",
305 20.0,
306 "2025-01-02",
307 ));
308
309 let (slow, frequent) = acc.into_results();
310 assert_eq!(slow.len(), 2, "D-11: output only actual count");
311 assert_eq!(frequent.len(), 2, "D-11: output only actual count");
312 }
313
314 #[test]
315 fn test_filter_both_from_and_to_excludes_outside_records() {
316 let mut acc = StatsAccumulator::new(
317 10,
318 Some("2024-01-15".to_string()),
319 Some("2024-01-15".to_string()),
320 );
321 acc.update(&make_record("SELECT 1", 1.0, "2024-01-14 10:00:00"));
322 acc.update(&make_record("SELECT 2", 2.0, "2024-01-15 00:00:00"));
323 acc.update(&make_record("SELECT 3", 3.0, "2024-01-15 23:59:59"));
324 acc.update(&make_record("SELECT 4", 4.0, "2024-01-16 10:00:00"));
325 let (slow, _) = acc.into_results();
326 assert_eq!(
327 slow.len(),
328 2,
329 "only records on 2024-01-15 should be included"
330 );
331 }
332
333 #[test]
334 fn test_filter_from_only_excludes_earlier_records() {
335 let mut acc = StatsAccumulator::new(10, Some("2024-01-15".to_string()), None);
336 acc.update(&make_record("SELECT A", 1.0, "2024-01-14"));
337 acc.update(&make_record("SELECT B", 2.0, "2024-01-15"));
338 acc.update(&make_record("SELECT C", 3.0, "2024-01-20"));
339 let (slow, _) = acc.into_results();
340 assert_eq!(
341 slow.len(),
342 2,
343 "records on/after 2024-01-15 should be included"
344 );
345 }
346
347 #[test]
348 fn test_filter_to_only_excludes_later_records() {
349 let mut acc = StatsAccumulator::new(10, None, Some("2024-01-15".to_string()));
350 acc.update(&make_record("SELECT A", 1.0, "2024-01-10"));
351 acc.update(&make_record("SELECT B", 2.0, "2024-01-15"));
352 acc.update(&make_record("SELECT C", 3.0, "2024-01-16"));
353 let (slow, _) = acc.into_results();
354 assert_eq!(
355 slow.len(),
356 2,
357 "records on/before 2024-01-15 should be included"
358 );
359 }
360
361 #[test]
362 fn test_filter_none_behavior_unchanged() {
363 let mut acc = StatsAccumulator::new(3, None, None);
364 acc.update(&make_record("SELECT 1", 10.0, "2025-01-01"));
365 acc.update(&make_record("SELECT 2", 50.0, "2025-01-02"));
366 acc.update(&make_record("SELECT 3", 30.0, "2025-01-03"));
367 acc.update(&make_record("SELECT 4", 20.0, "2025-01-04"));
368 acc.update(&make_record("SELECT 5", 40.0, "2025-01-05"));
369 let (slow, _) = acc.into_results();
370 assert_eq!(slow.len(), 3);
371 assert_eq!(slow[0].elapsed_ms, 50);
372 assert_eq!(slow[1].elapsed_ms, 40);
373 assert_eq!(slow[2].elapsed_ms, 30);
374 }
375
376 #[test]
377 fn test_filter_ts_too_short_treated_as_out_of_range() {
378 let mut acc = StatsAccumulator::new(10, Some("2024-01-15 10:00:00".to_string()), None);
379 acc.update(&make_record("SELECT X", 1.0, "2024-01-15"));
381 let (slow, _) = acc.into_results();
382 assert_eq!(
383 slow.len(),
384 0,
385 "ts too short should be treated as out of range"
386 );
387 }
388}