1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use parking_lot::RwLock;
12use serde::Serialize;
13use tracing::{info, warn};
14
15#[derive(Debug, Clone)]
17pub struct QueryLogConfig {
18 pub slow_query_threshold_ms: u64,
20 pub log_all_queries: bool,
22 pub enable_stats: bool,
24 pub max_slow_query_history: usize,
26}
27
28impl Default for QueryLogConfig {
29 fn default() -> Self {
30 Self {
31 slow_query_threshold_ms: 100, log_all_queries: false,
33 enable_stats: true,
34 max_slow_query_history: 1000,
35 }
36 }
37}
38
39impl QueryLogConfig {
40 pub fn development() -> Self {
42 Self {
43 slow_query_threshold_ms: 50,
44 log_all_queries: true,
45 enable_stats: true,
46 max_slow_query_history: 100,
47 }
48 }
49
50 pub fn production() -> Self {
52 Self {
53 slow_query_threshold_ms: 200,
54 log_all_queries: false,
55 enable_stats: true,
56 max_slow_query_history: 500,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize)]
63pub struct SlowQueryRecord {
64 pub query_hash: u64,
65 pub query_preview: String,
66 pub duration_ms: u64,
67 pub timestamp: chrono::DateTime<chrono::Utc>,
68 pub location: Option<String>,
69}
70
71#[derive(Debug, Clone, Serialize)]
73pub struct QueryStats {
74 pub query_hash: u64,
75 pub query_preview: String,
76 pub call_count: u64,
77 pub total_duration_ms: u64,
78 pub avg_duration_ms: f64,
79 pub max_duration_ms: u64,
80 pub min_duration_ms: u64,
81 pub slow_count: u64,
82}
83
84#[derive(Debug, Clone, Serialize)]
86pub struct AggregateStats {
87 pub total_queries: u64,
88 pub total_slow_queries: u64,
89 pub total_duration_ms: u64,
90 pub avg_duration_ms: f64,
91 pub queries_per_second: f64,
92 pub uptime_seconds: u64,
93}
94
95pub struct QueryLogger {
97 config: QueryLogConfig,
98 stats: Arc<RwLock<HashMap<u64, QueryStatsInner>>>,
99 slow_queries: Arc<RwLock<Vec<SlowQueryRecord>>>,
100 total_queries: AtomicU64,
101 total_slow: AtomicU64,
102 total_duration_ms: AtomicU64,
103 start_time: Instant,
104}
105
106#[derive(Debug, Clone)]
107struct QueryStatsInner {
108 query_preview: String,
109 call_count: u64,
110 total_duration_ms: u64,
111 max_duration_ms: u64,
112 min_duration_ms: u64,
113 slow_count: u64,
114}
115
116impl QueryLogger {
117 pub fn new(config: QueryLogConfig) -> Self {
118 Self {
119 config,
120 stats: Arc::new(RwLock::new(HashMap::new())),
121 slow_queries: Arc::new(RwLock::new(Vec::new())),
122 total_queries: AtomicU64::new(0),
123 total_slow: AtomicU64::new(0),
124 total_duration_ms: AtomicU64::new(0),
125 start_time: Instant::now(),
126 }
127 }
128
129 pub fn with_default_config() -> Self {
130 Self::new(QueryLogConfig::default())
131 }
132
133 pub fn log_query(&self, query: &str, duration: Duration, location: Option<&str>) {
135 let duration_ms = duration.as_millis() as u64;
136 let query_hash = hash_query(query);
137 let query_preview = truncate_query(query, 200);
138
139 self.total_queries.fetch_add(1, Ordering::Relaxed);
141 self.total_duration_ms
142 .fetch_add(duration_ms, Ordering::Relaxed);
143
144 let is_slow = duration_ms >= self.config.slow_query_threshold_ms;
145
146 if is_slow {
147 self.total_slow.fetch_add(1, Ordering::Relaxed);
148
149 warn!(
150 query = %query_preview,
151 duration_ms = duration_ms,
152 location = ?location,
153 "Slow query detected"
154 );
155
156 let mut slow_queries = self.slow_queries.write();
158 slow_queries.push(SlowQueryRecord {
159 query_hash,
160 query_preview: query_preview.clone(),
161 duration_ms,
162 timestamp: chrono::Utc::now(),
163 location: location.map(|s| s.to_string()),
164 });
165
166 if slow_queries.len() > self.config.max_slow_query_history {
168 slow_queries.remove(0);
169 }
170 } else if self.config.log_all_queries {
171 info!(
172 query = %query_preview,
173 duration_ms = duration_ms,
174 "Query executed"
175 );
176 }
177
178 if self.config.enable_stats {
180 let mut stats = self.stats.write();
181 let entry = stats.entry(query_hash).or_insert_with(|| QueryStatsInner {
182 query_preview,
183 call_count: 0,
184 total_duration_ms: 0,
185 max_duration_ms: 0,
186 min_duration_ms: u64::MAX,
187 slow_count: 0,
188 });
189
190 entry.call_count += 1;
191 entry.total_duration_ms += duration_ms;
192 entry.max_duration_ms = entry.max_duration_ms.max(duration_ms);
193 entry.min_duration_ms = entry.min_duration_ms.min(duration_ms);
194 if is_slow {
195 entry.slow_count += 1;
196 }
197 }
198 }
199
200 pub fn get_stats(&self) -> Vec<QueryStats> {
202 let stats = self.stats.read();
203 stats
204 .iter()
205 .map(|(hash, inner)| QueryStats {
206 query_hash: *hash,
207 query_preview: inner.query_preview.clone(),
208 call_count: inner.call_count,
209 total_duration_ms: inner.total_duration_ms,
210 avg_duration_ms: inner.total_duration_ms as f64 / inner.call_count.max(1) as f64,
211 max_duration_ms: inner.max_duration_ms,
212 min_duration_ms: if inner.min_duration_ms == u64::MAX {
213 0
214 } else {
215 inner.min_duration_ms
216 },
217 slow_count: inner.slow_count,
218 })
219 .collect()
220 }
221
222 pub fn get_slowest_queries(&self, n: usize) -> Vec<QueryStats> {
224 let mut stats = self.get_stats();
225 stats.sort_by(|a, b| b.avg_duration_ms.partial_cmp(&a.avg_duration_ms).unwrap());
226 stats.truncate(n);
227 stats
228 }
229
230 pub fn get_most_frequent_queries(&self, n: usize) -> Vec<QueryStats> {
232 let mut stats = self.get_stats();
233 stats.sort_by(|a, b| b.call_count.cmp(&a.call_count));
234 stats.truncate(n);
235 stats
236 }
237
238 pub fn get_slow_query_history(&self, limit: usize) -> Vec<SlowQueryRecord> {
240 let slow_queries = self.slow_queries.read();
241 slow_queries.iter().rev().take(limit).cloned().collect()
242 }
243
244 pub fn get_aggregate_stats(&self) -> AggregateStats {
246 let total_queries = self.total_queries.load(Ordering::Relaxed);
247 let total_slow = self.total_slow.load(Ordering::Relaxed);
248 let total_duration = self.total_duration_ms.load(Ordering::Relaxed);
249 let uptime = self.start_time.elapsed().as_secs();
250
251 AggregateStats {
252 total_queries,
253 total_slow_queries: total_slow,
254 total_duration_ms: total_duration,
255 avg_duration_ms: total_duration as f64 / total_queries.max(1) as f64,
256 queries_per_second: total_queries as f64 / uptime.max(1) as f64,
257 uptime_seconds: uptime,
258 }
259 }
260
261 pub fn reset_stats(&self) {
263 self.stats.write().clear();
264 self.slow_queries.write().clear();
265 self.total_queries.store(0, Ordering::Relaxed);
266 self.total_slow.store(0, Ordering::Relaxed);
267 self.total_duration_ms.store(0, Ordering::Relaxed);
268 }
269
270 pub fn is_slow(&self, duration: Duration) -> bool {
272 duration.as_millis() as u64 >= self.config.slow_query_threshold_ms
273 }
274}
275
276fn hash_query(query: &str) -> u64 {
278 use std::hash::{Hash, Hasher};
279 let mut hasher = std::collections::hash_map::DefaultHasher::new();
280 let normalized: String = query.split_whitespace().collect::<Vec<_>>().join(" ");
282 normalized.hash(&mut hasher);
283 hasher.finish()
284}
285
286fn truncate_query(query: &str, max_len: usize) -> String {
288 let normalized: String = query.split_whitespace().collect::<Vec<_>>().join(" ");
289 if normalized.len() <= max_len {
290 normalized
291 } else {
292 format!("{}...", &normalized[..max_len])
293 }
294}
295
296pub struct QueryTimer<'a> {
298 logger: &'a QueryLogger,
299 query: String,
300 location: Option<String>,
301 start: Instant,
302}
303
304impl<'a> QueryTimer<'a> {
305 pub fn new(logger: &'a QueryLogger, query: impl Into<String>) -> Self {
306 Self {
307 logger,
308 query: query.into(),
309 location: None,
310 start: Instant::now(),
311 }
312 }
313
314 pub fn with_location(mut self, location: impl Into<String>) -> Self {
315 self.location = Some(location.into());
316 self
317 }
318
319 pub fn finish(self) {
321 }
323}
324
325impl Drop for QueryTimer<'_> {
326 fn drop(&mut self) {
327 let duration = self.start.elapsed();
328 self.logger
329 .log_query(&self.query, duration, self.location.as_deref());
330 }
331}
332
333#[inline]
335pub fn time_query(logger: &QueryLogger, query: impl Into<String>) -> QueryTimer<'_> {
336 QueryTimer::new(logger, query)
337}