1use crate::{
4 analytics_error, normalize_log_entries, EventSourceKind, LogEntry, NormalizedEvent, Result,
5};
6use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone)]
12pub struct TimingAnalyzerConfig {
13 pub time_bucket_size: u32,
15 pub include_connections: bool,
17 pub include_peak_analysis: bool,
19}
20
21impl Default for TimingAnalyzerConfig {
22 fn default() -> Self {
23 Self {
24 time_bucket_size: 60, include_connections: true,
26 include_peak_analysis: true,
27 }
28 }
29}
30
31pub struct TimingAnalyzer {
33 config: TimingAnalyzerConfig,
35}
36
37impl TimingAnalyzer {
38 pub fn new() -> Self {
40 Self {
41 config: TimingAnalyzerConfig::default(),
42 }
43 }
44
45 pub fn with_config(config: TimingAnalyzerConfig) -> Self {
47 Self { config }
48 }
49
50 pub fn with_bucket_size(time_bucket_size: u32) -> Self {
52 Self {
53 config: TimingAnalyzerConfig {
54 time_bucket_size,
55 ..Default::default()
56 },
57 }
58 }
59
60 pub fn analyze_timing(&self, entries: &[LogEntry]) -> Result<TimingAnalysis> {
62 let events = normalize_log_entries(entries, EventSourceKind::Stderr);
63 self.analyze_timing_events(&events)
64 }
65
66 pub fn analyze_timing_events(&self, events: &[NormalizedEvent]) -> Result<TimingAnalysis> {
68 if events.is_empty() {
69 return Ok(TimingAnalysis::default());
70 }
71
72 let mut hourly_patterns = HashMap::new();
73 let mut daily_patterns = HashMap::new();
74 let mut response_times = Vec::new();
75 let mut connection_patterns = HashMap::new();
76 let mut peak_hours = Vec::new();
77
78 for event in events {
80 if let Some(duration) = event.duration_ms() {
81 response_times.push(duration);
82
83 let hour = event.timestamp.hour();
85 let current_duration = hourly_patterns.entry(hour).or_insert(0.0);
86 *current_duration += duration;
87
88 let day = event.timestamp.weekday().num_days_from_monday();
90 let current_day_duration = daily_patterns.entry(day).or_insert(0.0);
91 *current_day_duration += duration;
92 }
93
94 if self.config.include_connections
96 && event.message().to_lowercase().contains("connection")
97 {
98 let hour = event.timestamp.hour();
99 *connection_patterns.entry(hour).or_insert(0) += 1;
100 }
101 }
102
103 let avg_response_time = if !response_times.is_empty() {
105 response_times.iter().sum::<f64>() / response_times.len() as f64
106 } else {
107 0.0
108 };
109
110 let mut sorted_times = response_times.clone();
111 sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
112
113 let p95_response_time = if !sorted_times.is_empty() {
114 let p95_index = (sorted_times.len() as f64 * 0.95) as usize;
115 sorted_times[p95_index.min(sorted_times.len() - 1)]
116 } else {
117 0.0
118 };
119
120 let p99_response_time = if !sorted_times.is_empty() {
121 let p99_index = (sorted_times.len() as f64 * 0.99) as usize;
122 sorted_times[p99_index.min(sorted_times.len() - 1)]
123 } else {
124 0.0
125 };
126
127 if self.config.include_peak_analysis {
129 peak_hours = self.identify_peak_hours(&hourly_patterns);
130 }
131
132 Ok(TimingAnalysis {
133 average_response_time: Duration::milliseconds(avg_response_time as i64),
134 p95_response_time: Duration::milliseconds(p95_response_time as i64),
135 p99_response_time: Duration::milliseconds(p99_response_time as i64),
136 hourly_patterns,
137 daily_patterns,
138 connection_patterns,
139 peak_hours,
140 total_queries: response_times.len() as u64,
141 total_duration: response_times.iter().sum(),
142 })
143 }
144
145 pub fn calculate_percentiles(
147 &self,
148 response_times: &[f64],
149 percentiles: &[f64],
150 ) -> Result<Vec<(f64, f64)>> {
151 if response_times.is_empty() {
152 return Err(analytics_error(
153 "No response times provided",
154 "calculate_percentiles",
155 ));
156 }
157
158 let mut sorted_times = response_times.to_vec();
159 sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
160
161 let mut result = Vec::new();
162 for &percentile in percentiles {
163 if !(0.0..=1.0).contains(&percentile) {
164 return Err(analytics_error(
165 &format!("Invalid percentile: {}", percentile),
166 "calculate_percentiles",
167 ));
168 }
169
170 let index = (sorted_times.len() as f64 * percentile) as usize;
171 let value = sorted_times[index.min(sorted_times.len() - 1)];
172 result.push((percentile, value));
173 }
174
175 Ok(result)
176 }
177
178 pub fn analyze_hourly_distribution(
180 &self,
181 entries: &[LogEntry],
182 ) -> Result<HashMap<u32, HourlyMetrics>> {
183 let mut hourly_metrics = HashMap::new();
184
185 for entry in entries {
186 if entry.is_query() {
187 let hour = entry.timestamp.hour();
188 let metrics = hourly_metrics.entry(hour).or_insert_with(|| HourlyMetrics {
189 hour,
190 query_count: 0,
191 total_duration: 0.0,
192 average_duration: 0.0,
193 min_duration: f64::INFINITY,
194 max_duration: 0.0,
195 queries_per_second: 0.0,
196 });
197
198 let duration = entry.duration.unwrap_or(0.0);
199 metrics.query_count += 1;
200 metrics.total_duration += duration;
201 metrics.min_duration = metrics.min_duration.min(duration);
202 metrics.max_duration = metrics.max_duration.max(duration);
203 }
204 }
205
206 for metrics in hourly_metrics.values_mut() {
208 if metrics.query_count > 0 {
209 metrics.average_duration = metrics.total_duration / metrics.query_count as f64;
210 }
211 if metrics.min_duration == f64::INFINITY {
212 metrics.min_duration = 0.0;
213 }
214 }
215
216 self.calculate_queries_per_second(&mut hourly_metrics, entries);
218
219 Ok(hourly_metrics)
220 }
221
222 pub fn analyze_connection_patterns(&self, entries: &[LogEntry]) -> Result<ConnectionAnalysis> {
224 let mut hourly_connections = HashMap::new();
225 let mut daily_connections = HashMap::new();
226 let mut total_connections = 0;
227 let mut connection_errors = 0;
228
229 for entry in entries {
230 if entry.message.to_lowercase().contains("connection") {
231 total_connections += 1;
232
233 let hour = entry.timestamp.hour();
234 *hourly_connections.entry(hour).or_insert(0) += 1;
235
236 let day = entry.timestamp.weekday().num_days_from_monday();
237 *daily_connections.entry(day).or_insert(0) += 1;
238
239 if entry.is_error() {
240 connection_errors += 1;
241 }
242 }
243 }
244
245 Ok(ConnectionAnalysis {
246 total_connections,
247 connection_errors,
248 hourly_connections,
249 daily_connections,
250 error_rate: if total_connections > 0 {
251 connection_errors as f64 / total_connections as f64
252 } else {
253 0.0
254 },
255 })
256 }
257
258 fn identify_peak_hours(&self, hourly_patterns: &HashMap<u32, f64>) -> Vec<u32> {
260 if hourly_patterns.is_empty() {
261 return Vec::new();
262 }
263
264 let avg_duration = hourly_patterns.values().sum::<f64>() / hourly_patterns.len() as f64;
265 let threshold = avg_duration * 1.5; let mut peak_hours: Vec<_> = hourly_patterns
268 .iter()
269 .filter(|(_, &duration)| duration > threshold)
270 .map(|(&hour, _)| hour)
271 .collect();
272
273 peak_hours.sort();
274 peak_hours
275 }
276
277 fn calculate_queries_per_second(
279 &self,
280 hourly_metrics: &mut HashMap<u32, HourlyMetrics>,
281 entries: &[LogEntry],
282 ) {
283 let mut hourly_entries: HashMap<u32, Vec<DateTime<Utc>>> = HashMap::new();
285
286 for entry in entries {
287 if entry.is_query() {
288 let hour = entry.timestamp.hour();
289 hourly_entries
290 .entry(hour)
291 .or_default()
292 .push(entry.timestamp);
293 }
294 }
295
296 for (hour, timestamps) in hourly_entries {
297 if let Some(metrics) = hourly_metrics.get_mut(&hour) {
298 if timestamps.len() > 1 {
299 let min_time = timestamps.iter().min().unwrap();
300 let max_time = timestamps.iter().max().unwrap();
301 let duration_seconds = (*max_time - *min_time).num_seconds() as f64;
302
303 if duration_seconds > 0.0 {
304 metrics.queries_per_second = metrics.query_count as f64 / duration_seconds;
305 }
306 }
307 }
308 }
309 }
310
311 pub fn get_peak_usage_analysis(&self, entries: &[LogEntry]) -> Result<PeakUsageAnalysis> {
313 let hourly_distribution = self.analyze_hourly_distribution(entries)?;
314
315 if hourly_distribution.is_empty() {
316 return Ok(PeakUsageAnalysis::default());
317 }
318
319 let max_queries = hourly_distribution
320 .values()
321 .map(|m| m.query_count)
322 .max()
323 .unwrap_or(0);
324 let max_duration = hourly_distribution
325 .values()
326 .map(|m| m.total_duration)
327 .fold(0.0_f64, f64::max);
328
329 let peak_hours: Vec<_> = hourly_distribution
330 .iter()
331 .filter(|(_, metrics)| {
332 metrics.query_count as f64 >= max_queries as f64 * 0.8 || metrics.total_duration >= max_duration * 0.8 })
335 .map(|(&hour, _)| hour)
336 .collect();
337
338 let busiest_hour = hourly_distribution
339 .iter()
340 .max_by(|(_, a), (_, b)| a.query_count.cmp(&b.query_count))
341 .map(|(&hour, _)| hour);
342
343 Ok(PeakUsageAnalysis {
344 peak_hours,
345 busiest_hour,
346 max_queries_per_hour: max_queries,
347 max_duration_per_hour: max_duration,
348 average_queries_per_hour: hourly_distribution
349 .values()
350 .map(|m| m.query_count)
351 .sum::<u64>()
352 / hourly_distribution.len() as u64,
353 })
354 }
355}
356
357impl Default for TimingAnalyzer {
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct TimingAnalysis {
366 pub average_response_time: Duration,
367 pub p95_response_time: Duration,
368 pub p99_response_time: Duration,
369 pub hourly_patterns: HashMap<u32, f64>,
370 pub daily_patterns: HashMap<u32, f64>,
371 pub connection_patterns: HashMap<u32, u64>,
372 pub peak_hours: Vec<u32>,
373 pub total_queries: u64,
374 pub total_duration: f64,
375}
376
377impl Default for TimingAnalysis {
378 fn default() -> Self {
379 Self {
380 average_response_time: Duration::zero(),
381 p95_response_time: Duration::zero(),
382 p99_response_time: Duration::zero(),
383 hourly_patterns: HashMap::new(),
384 daily_patterns: HashMap::new(),
385 connection_patterns: HashMap::new(),
386 peak_hours: Vec::new(),
387 total_queries: 0,
388 total_duration: 0.0,
389 }
390 }
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct HourlyMetrics {
396 pub hour: u32,
397 pub query_count: u64,
398 pub total_duration: f64,
399 pub average_duration: f64,
400 pub min_duration: f64,
401 pub max_duration: f64,
402 pub queries_per_second: f64,
403}
404
405#[derive(Debug, Clone, Serialize, Deserialize)]
407pub struct ConnectionAnalysis {
408 pub total_connections: u64,
409 pub connection_errors: u64,
410 pub hourly_connections: HashMap<u32, u64>,
411 pub daily_connections: HashMap<u32, u64>,
412 pub error_rate: f64,
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct PeakUsageAnalysis {
418 pub peak_hours: Vec<u32>,
419 pub busiest_hour: Option<u32>,
420 pub max_queries_per_hour: u64,
421 pub max_duration_per_hour: f64,
422 pub average_queries_per_hour: u64,
423}
424
425impl Default for PeakUsageAnalysis {
426 fn default() -> Self {
427 Self {
428 peak_hours: Vec::new(),
429 busiest_hour: None,
430 max_queries_per_hour: 0,
431 max_duration_per_hour: 0.0,
432 average_queries_per_hour: 0,
433 }
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440 use crate::LogLevel;
441
442 fn create_test_entry(
443 timestamp: DateTime<Utc>,
444 message_type: LogLevel,
445 duration: Option<f64>,
446 message: &str,
447 ) -> LogEntry {
448 LogEntry {
449 timestamp,
450 process_id: "12345".to_string(),
451 user: Some("test_user".to_string()),
452 database: Some("testdb".to_string()),
453 client_host: None,
454 application_name: Some("psql".to_string()),
455 message_type,
456 message: message.to_string(),
457 queries: None,
458 duration,
459 }
460 }
461
462 #[test]
463 fn test_analyze_timing_empty_entries() {
464 let analyzer = TimingAnalyzer::new();
465 let result = analyzer.analyze_timing(&[]).unwrap();
466
467 assert_eq!(result.total_queries, 0);
468 assert_eq!(result.total_duration, 0.0);
469 assert!(result.hourly_patterns.is_empty());
470 }
471
472 #[test]
473 fn test_analyze_timing_with_entries() {
474 let analyzer = TimingAnalyzer::new();
475 let now = Utc::now();
476
477 let entries = vec![
478 create_test_entry(now, LogLevel::Statement, Some(100.0), "statement: SELECT 1"),
479 create_test_entry(now, LogLevel::Statement, Some(200.0), "statement: SELECT 2"),
480 create_test_entry(now, LogLevel::Statement, Some(300.0), "statement: SELECT 3"),
481 ];
482
483 let result = analyzer.analyze_timing(&entries).unwrap();
484
485 assert_eq!(result.total_queries, 3);
486 assert_eq!(result.total_duration, 600.0);
487 assert_eq!(result.average_response_time.num_milliseconds(), 200);
488 }
489
490 #[test]
491 fn test_analyze_timing_events_matches_log_entry_analysis() {
492 let analyzer = TimingAnalyzer::new();
493 let now = Utc::now();
494
495 let entries = vec![
496 create_test_entry(now, LogLevel::Statement, Some(100.0), "statement: SELECT 1"),
497 create_test_entry(
498 now + Duration::seconds(1),
499 LogLevel::Statement,
500 Some(250.0),
501 "statement: SELECT 2",
502 ),
503 create_test_entry(
504 now + Duration::seconds(2),
505 LogLevel::Log,
506 None,
507 "connection received",
508 ),
509 ];
510
511 let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
512 let entry_result = analyzer.analyze_timing(&entries).unwrap();
513 let event_result = analyzer.analyze_timing_events(&events).unwrap();
514
515 assert_eq!(event_result.total_queries, entry_result.total_queries);
516 assert_eq!(event_result.total_duration, entry_result.total_duration);
517 assert_eq!(
518 event_result.average_response_time,
519 entry_result.average_response_time
520 );
521 assert_eq!(event_result.hourly_patterns, entry_result.hourly_patterns);
522 assert_eq!(
523 event_result.connection_patterns,
524 entry_result.connection_patterns
525 );
526 }
527
528 #[test]
529 fn test_calculate_percentiles() {
530 let analyzer = TimingAnalyzer::new();
531 let response_times = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
532 let percentiles = vec![0.5, 0.95, 0.99];
533
534 let result = analyzer
535 .calculate_percentiles(&response_times, &percentiles)
536 .unwrap();
537
538 assert_eq!(result.len(), 3);
539 assert_eq!(result[0], (0.5, 60.0)); assert_eq!(result[1], (0.95, 100.0)); assert_eq!(result[2], (0.99, 100.0)); }
543
544 #[test]
545 fn test_analyze_connection_patterns() {
546 let analyzer = TimingAnalyzer::new();
547 let now = Utc::now();
548
549 let entries = vec![
550 create_test_entry(now, LogLevel::Log, None, "connection received"),
551 create_test_entry(now, LogLevel::Log, None, "connection established"),
552 create_test_entry(now, LogLevel::Error, None, "connection failed"),
553 ];
554
555 let result = analyzer.analyze_connection_patterns(&entries).unwrap();
556
557 assert_eq!(result.total_connections, 3);
558 assert_eq!(result.connection_errors, 1);
559 assert_eq!(result.error_rate, 1.0 / 3.0);
560 }
561
562 #[test]
563 fn test_peak_usage_analysis() {
564 let analyzer = TimingAnalyzer::new();
565 let now = Utc::now();
566
567 let mut entries = Vec::new();
569
570 for i in 0..5 {
572 let timestamp = (now + Duration::hours(10))
573 .with_nanosecond(i * 1_000_000)
574 .unwrap();
575 entries.push(create_test_entry(
576 timestamp,
577 LogLevel::Statement,
578 Some(100.0),
579 "statement: SELECT 1",
580 ));
581 }
582
583 for i in 0..10 {
585 let timestamp = (now + Duration::hours(11))
586 .with_nanosecond(i * 1_000_000)
587 .unwrap();
588 entries.push(create_test_entry(
589 timestamp,
590 LogLevel::Statement,
591 Some(100.0),
592 "statement: SELECT 1",
593 ));
594 }
595
596 for i in 0..3 {
598 let timestamp = (now + Duration::hours(10))
599 .with_nanosecond(i * 1_000_000)
600 .unwrap();
601 entries.push(create_test_entry(
602 timestamp,
603 LogLevel::Statement,
604 Some(100.0),
605 "statement: SELECT 1",
606 ));
607 }
608
609 let result = analyzer.get_peak_usage_analysis(&entries).unwrap();
610
611 assert_eq!(result.max_queries_per_hour, 10);
612 assert!(result.busiest_hour.is_some());
614 assert!(!result.peak_hours.is_empty());
615 }
616
617 #[test]
618 fn test_invalid_percentile() {
619 let analyzer = TimingAnalyzer::new();
620 let response_times = vec![10.0, 20.0, 30.0];
621 let percentiles = vec![1.5]; let result = analyzer.calculate_percentiles(&response_times, &percentiles);
624 assert!(result.is_err());
625 }
626}