kaccy_db/
query_logger.rs

1//! Query logging and slow query detection
2//!
3//! Provides middleware for tracking SQL query performance
4//! and detecting slow queries.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use parking_lot::RwLock;
12use serde::Serialize;
13use tracing::{info, warn};
14
15/// Configuration for query logging
16#[derive(Debug, Clone)]
17pub struct QueryLogConfig {
18    /// Threshold for slow query warnings (milliseconds)
19    pub slow_query_threshold_ms: u64,
20    /// Whether to log all queries (not just slow ones)
21    pub log_all_queries: bool,
22    /// Whether to capture query statistics
23    pub enable_stats: bool,
24    /// Maximum number of slow queries to retain in history
25    pub max_slow_query_history: usize,
26}
27
28impl Default for QueryLogConfig {
29    fn default() -> Self {
30        Self {
31            slow_query_threshold_ms: 100, // 100ms default
32            log_all_queries: false,
33            enable_stats: true,
34            max_slow_query_history: 1000,
35        }
36    }
37}
38
39impl QueryLogConfig {
40    /// Development configuration (verbose logging)
41    pub fn development() -> Self {
42        Self {
43            slow_query_threshold_ms: 50,
44            log_all_queries: true,
45            enable_stats: true,
46            max_slow_query_history: 100,
47        }
48    }
49
50    /// Production configuration (minimal logging)
51    pub fn production() -> Self {
52        Self {
53            slow_query_threshold_ms: 200,
54            log_all_queries: false,
55            enable_stats: true,
56            max_slow_query_history: 500,
57        }
58    }
59}
60
61/// Record of a slow query
62#[derive(Debug, Clone, Serialize)]
63pub struct SlowQueryRecord {
64    pub query_hash: u64,
65    pub query_preview: String,
66    pub duration_ms: u64,
67    pub timestamp: chrono::DateTime<chrono::Utc>,
68    pub location: Option<String>,
69}
70
71/// Statistics for a specific query pattern
72#[derive(Debug, Clone, Serialize)]
73pub struct QueryStats {
74    pub query_hash: u64,
75    pub query_preview: String,
76    pub call_count: u64,
77    pub total_duration_ms: u64,
78    pub avg_duration_ms: f64,
79    pub max_duration_ms: u64,
80    pub min_duration_ms: u64,
81    pub slow_count: u64,
82}
83
84/// Aggregate statistics
85#[derive(Debug, Clone, Serialize)]
86pub struct AggregateStats {
87    pub total_queries: u64,
88    pub total_slow_queries: u64,
89    pub total_duration_ms: u64,
90    pub avg_duration_ms: f64,
91    pub queries_per_second: f64,
92    pub uptime_seconds: u64,
93}
94
95/// Query logger service
96pub struct QueryLogger {
97    config: QueryLogConfig,
98    stats: Arc<RwLock<HashMap<u64, QueryStatsInner>>>,
99    slow_queries: Arc<RwLock<Vec<SlowQueryRecord>>>,
100    total_queries: AtomicU64,
101    total_slow: AtomicU64,
102    total_duration_ms: AtomicU64,
103    start_time: Instant,
104}
105
106#[derive(Debug, Clone)]
107struct QueryStatsInner {
108    query_preview: String,
109    call_count: u64,
110    total_duration_ms: u64,
111    max_duration_ms: u64,
112    min_duration_ms: u64,
113    slow_count: u64,
114}
115
116impl QueryLogger {
117    pub fn new(config: QueryLogConfig) -> Self {
118        Self {
119            config,
120            stats: Arc::new(RwLock::new(HashMap::new())),
121            slow_queries: Arc::new(RwLock::new(Vec::new())),
122            total_queries: AtomicU64::new(0),
123            total_slow: AtomicU64::new(0),
124            total_duration_ms: AtomicU64::new(0),
125            start_time: Instant::now(),
126        }
127    }
128
129    pub fn with_default_config() -> Self {
130        Self::new(QueryLogConfig::default())
131    }
132
133    /// Log a query execution
134    pub fn log_query(&self, query: &str, duration: Duration, location: Option<&str>) {
135        let duration_ms = duration.as_millis() as u64;
136        let query_hash = hash_query(query);
137        let query_preview = truncate_query(query, 200);
138
139        // Update totals
140        self.total_queries.fetch_add(1, Ordering::Relaxed);
141        self.total_duration_ms
142            .fetch_add(duration_ms, Ordering::Relaxed);
143
144        let is_slow = duration_ms >= self.config.slow_query_threshold_ms;
145
146        if is_slow {
147            self.total_slow.fetch_add(1, Ordering::Relaxed);
148
149            warn!(
150                query = %query_preview,
151                duration_ms = duration_ms,
152                location = ?location,
153                "Slow query detected"
154            );
155
156            // Record slow query
157            let mut slow_queries = self.slow_queries.write();
158            slow_queries.push(SlowQueryRecord {
159                query_hash,
160                query_preview: query_preview.clone(),
161                duration_ms,
162                timestamp: chrono::Utc::now(),
163                location: location.map(|s| s.to_string()),
164            });
165
166            // Trim history if needed
167            if slow_queries.len() > self.config.max_slow_query_history {
168                slow_queries.remove(0);
169            }
170        } else if self.config.log_all_queries {
171            info!(
172                query = %query_preview,
173                duration_ms = duration_ms,
174                "Query executed"
175            );
176        }
177
178        // Update stats if enabled
179        if self.config.enable_stats {
180            let mut stats = self.stats.write();
181            let entry = stats.entry(query_hash).or_insert_with(|| QueryStatsInner {
182                query_preview,
183                call_count: 0,
184                total_duration_ms: 0,
185                max_duration_ms: 0,
186                min_duration_ms: u64::MAX,
187                slow_count: 0,
188            });
189
190            entry.call_count += 1;
191            entry.total_duration_ms += duration_ms;
192            entry.max_duration_ms = entry.max_duration_ms.max(duration_ms);
193            entry.min_duration_ms = entry.min_duration_ms.min(duration_ms);
194            if is_slow {
195                entry.slow_count += 1;
196            }
197        }
198    }
199
200    /// Get statistics for all queries
201    pub fn get_stats(&self) -> Vec<QueryStats> {
202        let stats = self.stats.read();
203        stats
204            .iter()
205            .map(|(hash, inner)| QueryStats {
206                query_hash: *hash,
207                query_preview: inner.query_preview.clone(),
208                call_count: inner.call_count,
209                total_duration_ms: inner.total_duration_ms,
210                avg_duration_ms: inner.total_duration_ms as f64 / inner.call_count.max(1) as f64,
211                max_duration_ms: inner.max_duration_ms,
212                min_duration_ms: if inner.min_duration_ms == u64::MAX {
213                    0
214                } else {
215                    inner.min_duration_ms
216                },
217                slow_count: inner.slow_count,
218            })
219            .collect()
220    }
221
222    /// Get top N slowest query patterns
223    pub fn get_slowest_queries(&self, n: usize) -> Vec<QueryStats> {
224        let mut stats = self.get_stats();
225        stats.sort_by(|a, b| b.avg_duration_ms.partial_cmp(&a.avg_duration_ms).unwrap());
226        stats.truncate(n);
227        stats
228    }
229
230    /// Get top N most frequent queries
231    pub fn get_most_frequent_queries(&self, n: usize) -> Vec<QueryStats> {
232        let mut stats = self.get_stats();
233        stats.sort_by(|a, b| b.call_count.cmp(&a.call_count));
234        stats.truncate(n);
235        stats
236    }
237
238    /// Get recent slow query records
239    pub fn get_slow_query_history(&self, limit: usize) -> Vec<SlowQueryRecord> {
240        let slow_queries = self.slow_queries.read();
241        slow_queries.iter().rev().take(limit).cloned().collect()
242    }
243
244    /// Get aggregate statistics
245    pub fn get_aggregate_stats(&self) -> AggregateStats {
246        let total_queries = self.total_queries.load(Ordering::Relaxed);
247        let total_slow = self.total_slow.load(Ordering::Relaxed);
248        let total_duration = self.total_duration_ms.load(Ordering::Relaxed);
249        let uptime = self.start_time.elapsed().as_secs();
250
251        AggregateStats {
252            total_queries,
253            total_slow_queries: total_slow,
254            total_duration_ms: total_duration,
255            avg_duration_ms: total_duration as f64 / total_queries.max(1) as f64,
256            queries_per_second: total_queries as f64 / uptime.max(1) as f64,
257            uptime_seconds: uptime,
258        }
259    }
260
261    /// Reset statistics
262    pub fn reset_stats(&self) {
263        self.stats.write().clear();
264        self.slow_queries.write().clear();
265        self.total_queries.store(0, Ordering::Relaxed);
266        self.total_slow.store(0, Ordering::Relaxed);
267        self.total_duration_ms.store(0, Ordering::Relaxed);
268    }
269
270    /// Check if a duration is considered slow
271    pub fn is_slow(&self, duration: Duration) -> bool {
272        duration.as_millis() as u64 >= self.config.slow_query_threshold_ms
273    }
274}
275
276/// Simple hash function for query strings
277fn hash_query(query: &str) -> u64 {
278    use std::hash::{Hash, Hasher};
279    let mut hasher = std::collections::hash_map::DefaultHasher::new();
280    // Normalize whitespace for consistent hashing
281    let normalized: String = query.split_whitespace().collect::<Vec<_>>().join(" ");
282    normalized.hash(&mut hasher);
283    hasher.finish()
284}
285
286/// Truncate query string for display
287fn truncate_query(query: &str, max_len: usize) -> String {
288    let normalized: String = query.split_whitespace().collect::<Vec<_>>().join(" ");
289    if normalized.len() <= max_len {
290        normalized
291    } else {
292        format!("{}...", &normalized[..max_len])
293    }
294}
295
296/// Query timing guard - logs query duration when dropped
297pub struct QueryTimer<'a> {
298    logger: &'a QueryLogger,
299    query: String,
300    location: Option<String>,
301    start: Instant,
302}
303
304impl<'a> QueryTimer<'a> {
305    pub fn new(logger: &'a QueryLogger, query: impl Into<String>) -> Self {
306        Self {
307            logger,
308            query: query.into(),
309            location: None,
310            start: Instant::now(),
311        }
312    }
313
314    pub fn with_location(mut self, location: impl Into<String>) -> Self {
315        self.location = Some(location.into());
316        self
317    }
318
319    /// Manually finish timing (called automatically on drop)
320    pub fn finish(self) {
321        // Drop will handle logging
322    }
323}
324
325impl Drop for QueryTimer<'_> {
326    fn drop(&mut self) {
327        let duration = self.start.elapsed();
328        self.logger
329            .log_query(&self.query, duration, self.location.as_deref());
330    }
331}
332
333/// Start timing a query
334#[inline]
335pub fn time_query(logger: &QueryLogger, query: impl Into<String>) -> QueryTimer<'_> {
336    QueryTimer::new(logger, query)
337}