Skip to main content

pg_logstats/analytics/
timing.rs

1//! Performance timing analysis for PostgreSQL logs
2
3use crate::{
4    analytics_error, normalize_log_entries, EventSourceKind, LogEntry, NormalizedEvent, Result,
5};
6use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Timing analyzer configuration
11#[derive(Debug, Clone)]
12pub struct TimingAnalyzerConfig {
13    /// Time bucket size in minutes for aggregation
14    pub time_bucket_size: u32,
15    /// Whether to include connection analysis
16    pub include_connections: bool,
17    /// Whether to include peak usage analysis
18    pub include_peak_analysis: bool,
19}
20
21impl Default for TimingAnalyzerConfig {
22    fn default() -> Self {
23        Self {
24            time_bucket_size: 60, // 1 hour default
25            include_connections: true,
26            include_peak_analysis: true,
27        }
28    }
29}
30
31/// Analyzer for timing and performance metrics
32pub struct TimingAnalyzer {
33    /// Configuration for timing analysis
34    config: TimingAnalyzerConfig,
35}
36
37impl TimingAnalyzer {
38    /// Create a new timing analyzer with default configuration
39    pub fn new() -> Self {
40        Self {
41            config: TimingAnalyzerConfig::default(),
42        }
43    }
44
45    /// Create a new timing analyzer with custom configuration
46    pub fn with_config(config: TimingAnalyzerConfig) -> Self {
47        Self { config }
48    }
49
50    /// Create a new timing analyzer with custom bucket size
51    pub fn with_bucket_size(time_bucket_size: u32) -> Self {
52        Self {
53            config: TimingAnalyzerConfig {
54                time_bucket_size,
55                ..Default::default()
56            },
57        }
58    }
59
60    /// Analyze timing patterns in log entries
61    pub fn analyze_timing(&self, entries: &[LogEntry]) -> Result<TimingAnalysis> {
62        let events = normalize_log_entries(entries, EventSourceKind::Stderr);
63        self.analyze_timing_events(&events)
64    }
65
66    /// Analyze timing patterns in normalized events.
67    pub fn analyze_timing_events(&self, events: &[NormalizedEvent]) -> Result<TimingAnalysis> {
68        if events.is_empty() {
69            return Ok(TimingAnalysis::default());
70        }
71
72        let mut hourly_patterns = HashMap::new();
73        let mut daily_patterns = HashMap::new();
74        let mut response_times = Vec::new();
75        let mut connection_patterns = HashMap::new();
76        let mut peak_hours = Vec::new();
77
78        // Process each entry
79        for event in events {
80            if let Some(duration) = event.duration_ms() {
81                response_times.push(duration);
82
83                // Group by hour
84                let hour = event.timestamp.hour();
85                let current_duration = hourly_patterns.entry(hour).or_insert(0.0);
86                *current_duration += duration;
87
88                // Group by day of week
89                let day = event.timestamp.weekday().num_days_from_monday();
90                let current_day_duration = daily_patterns.entry(day).or_insert(0.0);
91                *current_day_duration += duration;
92            }
93
94            // Analyze connection patterns if enabled
95            if self.config.include_connections
96                && event.message().to_lowercase().contains("connection")
97            {
98                let hour = event.timestamp.hour();
99                *connection_patterns.entry(hour).or_insert(0) += 1;
100            }
101        }
102
103        // Calculate basic statistics
104        let avg_response_time = if !response_times.is_empty() {
105            response_times.iter().sum::<f64>() / response_times.len() as f64
106        } else {
107            0.0
108        };
109
110        let mut sorted_times = response_times.clone();
111        sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
112
113        let p95_response_time = if !sorted_times.is_empty() {
114            let p95_index = (sorted_times.len() as f64 * 0.95) as usize;
115            sorted_times[p95_index.min(sorted_times.len() - 1)]
116        } else {
117            0.0
118        };
119
120        let p99_response_time = if !sorted_times.is_empty() {
121            let p99_index = (sorted_times.len() as f64 * 0.99) as usize;
122            sorted_times[p99_index.min(sorted_times.len() - 1)]
123        } else {
124            0.0
125        };
126
127        // Identify peak usage hours if enabled
128        if self.config.include_peak_analysis {
129            peak_hours = self.identify_peak_hours(&hourly_patterns);
130        }
131
132        Ok(TimingAnalysis {
133            average_response_time: Duration::milliseconds(avg_response_time as i64),
134            p95_response_time: Duration::milliseconds(p95_response_time as i64),
135            p99_response_time: Duration::milliseconds(p99_response_time as i64),
136            hourly_patterns,
137            daily_patterns,
138            connection_patterns,
139            peak_hours,
140            total_queries: response_times.len() as u64,
141            total_duration: response_times.iter().sum(),
142        })
143    }
144
145    /// Calculate response time percentiles
146    pub fn calculate_percentiles(
147        &self,
148        response_times: &[f64],
149        percentiles: &[f64],
150    ) -> Result<Vec<(f64, f64)>> {
151        if response_times.is_empty() {
152            return Err(analytics_error(
153                "No response times provided",
154                "calculate_percentiles",
155            ));
156        }
157
158        let mut sorted_times = response_times.to_vec();
159        sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
160
161        let mut result = Vec::new();
162        for &percentile in percentiles {
163            if !(0.0..=1.0).contains(&percentile) {
164                return Err(analytics_error(
165                    &format!("Invalid percentile: {}", percentile),
166                    "calculate_percentiles",
167                ));
168            }
169
170            let index = (sorted_times.len() as f64 * percentile) as usize;
171            let value = sorted_times[index.min(sorted_times.len() - 1)];
172            result.push((percentile, value));
173        }
174
175        Ok(result)
176    }
177
178    /// Analyze hourly query distribution
179    pub fn analyze_hourly_distribution(
180        &self,
181        entries: &[LogEntry],
182    ) -> Result<HashMap<u32, HourlyMetrics>> {
183        let mut hourly_metrics = HashMap::new();
184
185        for entry in entries {
186            if entry.is_query() {
187                let hour = entry.timestamp.hour();
188                let metrics = hourly_metrics.entry(hour).or_insert_with(|| HourlyMetrics {
189                    hour,
190                    query_count: 0,
191                    total_duration: 0.0,
192                    average_duration: 0.0,
193                    min_duration: f64::INFINITY,
194                    max_duration: 0.0,
195                    queries_per_second: 0.0,
196                });
197
198                let duration = entry.duration.unwrap_or(0.0);
199                metrics.query_count += 1;
200                metrics.total_duration += duration;
201                metrics.min_duration = metrics.min_duration.min(duration);
202                metrics.max_duration = metrics.max_duration.max(duration);
203            }
204        }
205
206        // Calculate averages and queries per second
207        for metrics in hourly_metrics.values_mut() {
208            if metrics.query_count > 0 {
209                metrics.average_duration = metrics.total_duration / metrics.query_count as f64;
210            }
211            if metrics.min_duration == f64::INFINITY {
212                metrics.min_duration = 0.0;
213            }
214        }
215
216        // Calculate queries per second (simplified - would need time range for accurate calculation)
217        self.calculate_queries_per_second(&mut hourly_metrics, entries);
218
219        Ok(hourly_metrics)
220    }
221
222    /// Analyze connection patterns
223    pub fn analyze_connection_patterns(&self, entries: &[LogEntry]) -> Result<ConnectionAnalysis> {
224        let mut hourly_connections = HashMap::new();
225        let mut daily_connections = HashMap::new();
226        let mut total_connections = 0;
227        let mut connection_errors = 0;
228
229        for entry in entries {
230            if entry.message.to_lowercase().contains("connection") {
231                total_connections += 1;
232
233                let hour = entry.timestamp.hour();
234                *hourly_connections.entry(hour).or_insert(0) += 1;
235
236                let day = entry.timestamp.weekday().num_days_from_monday();
237                *daily_connections.entry(day).or_insert(0) += 1;
238
239                if entry.is_error() {
240                    connection_errors += 1;
241                }
242            }
243        }
244
245        Ok(ConnectionAnalysis {
246            total_connections,
247            connection_errors,
248            hourly_connections,
249            daily_connections,
250            error_rate: if total_connections > 0 {
251                connection_errors as f64 / total_connections as f64
252            } else {
253                0.0
254            },
255        })
256    }
257
258    /// Identify peak usage hours
259    fn identify_peak_hours(&self, hourly_patterns: &HashMap<u32, f64>) -> Vec<u32> {
260        if hourly_patterns.is_empty() {
261            return Vec::new();
262        }
263
264        let avg_duration = hourly_patterns.values().sum::<f64>() / hourly_patterns.len() as f64;
265        let threshold = avg_duration * 1.5; // 50% above average
266
267        let mut peak_hours: Vec<_> = hourly_patterns
268            .iter()
269            .filter(|(_, &duration)| duration > threshold)
270            .map(|(&hour, _)| hour)
271            .collect();
272
273        peak_hours.sort();
274        peak_hours
275    }
276
277    /// Calculate queries per second for hourly buckets
278    fn calculate_queries_per_second(
279        &self,
280        hourly_metrics: &mut HashMap<u32, HourlyMetrics>,
281        entries: &[LogEntry],
282    ) {
283        // Group entries by hour to calculate time spans
284        let mut hourly_entries: HashMap<u32, Vec<DateTime<Utc>>> = HashMap::new();
285
286        for entry in entries {
287            if entry.is_query() {
288                let hour = entry.timestamp.hour();
289                hourly_entries
290                    .entry(hour)
291                    .or_default()
292                    .push(entry.timestamp);
293            }
294        }
295
296        for (hour, timestamps) in hourly_entries {
297            if let Some(metrics) = hourly_metrics.get_mut(&hour) {
298                if timestamps.len() > 1 {
299                    let min_time = timestamps.iter().min().unwrap();
300                    let max_time = timestamps.iter().max().unwrap();
301                    let duration_seconds = (*max_time - *min_time).num_seconds() as f64;
302
303                    if duration_seconds > 0.0 {
304                        metrics.queries_per_second = metrics.query_count as f64 / duration_seconds;
305                    }
306                }
307            }
308        }
309    }
310
311    /// Get peak usage analysis
312    pub fn get_peak_usage_analysis(&self, entries: &[LogEntry]) -> Result<PeakUsageAnalysis> {
313        let hourly_distribution = self.analyze_hourly_distribution(entries)?;
314
315        if hourly_distribution.is_empty() {
316            return Ok(PeakUsageAnalysis::default());
317        }
318
319        let max_queries = hourly_distribution
320            .values()
321            .map(|m| m.query_count)
322            .max()
323            .unwrap_or(0);
324        let max_duration = hourly_distribution
325            .values()
326            .map(|m| m.total_duration)
327            .fold(0.0_f64, f64::max);
328
329        let peak_hours: Vec<_> = hourly_distribution
330            .iter()
331            .filter(|(_, metrics)| {
332                metrics.query_count as f64 >= max_queries as f64 * 0.8 || // 80% of max queries
333                metrics.total_duration >= max_duration * 0.8 // 80% of max duration
334            })
335            .map(|(&hour, _)| hour)
336            .collect();
337
338        let busiest_hour = hourly_distribution
339            .iter()
340            .max_by(|(_, a), (_, b)| a.query_count.cmp(&b.query_count))
341            .map(|(&hour, _)| hour);
342
343        Ok(PeakUsageAnalysis {
344            peak_hours,
345            busiest_hour,
346            max_queries_per_hour: max_queries,
347            max_duration_per_hour: max_duration,
348            average_queries_per_hour: hourly_distribution
349                .values()
350                .map(|m| m.query_count)
351                .sum::<u64>()
352                / hourly_distribution.len() as u64,
353        })
354    }
355}
356
357impl Default for TimingAnalyzer {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363/// Results of timing analysis
364#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct TimingAnalysis {
366    pub average_response_time: Duration,
367    pub p95_response_time: Duration,
368    pub p99_response_time: Duration,
369    pub hourly_patterns: HashMap<u32, f64>,
370    pub daily_patterns: HashMap<u32, f64>,
371    pub connection_patterns: HashMap<u32, u64>,
372    pub peak_hours: Vec<u32>,
373    pub total_queries: u64,
374    pub total_duration: f64,
375}
376
377impl Default for TimingAnalysis {
378    fn default() -> Self {
379        Self {
380            average_response_time: Duration::zero(),
381            p95_response_time: Duration::zero(),
382            p99_response_time: Duration::zero(),
383            hourly_patterns: HashMap::new(),
384            daily_patterns: HashMap::new(),
385            connection_patterns: HashMap::new(),
386            peak_hours: Vec::new(),
387            total_queries: 0,
388            total_duration: 0.0,
389        }
390    }
391}
392
393/// Hourly metrics for detailed analysis
394#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct HourlyMetrics {
396    pub hour: u32,
397    pub query_count: u64,
398    pub total_duration: f64,
399    pub average_duration: f64,
400    pub min_duration: f64,
401    pub max_duration: f64,
402    pub queries_per_second: f64,
403}
404
405/// Connection pattern analysis
406#[derive(Debug, Clone, Serialize, Deserialize)]
407pub struct ConnectionAnalysis {
408    pub total_connections: u64,
409    pub connection_errors: u64,
410    pub hourly_connections: HashMap<u32, u64>,
411    pub daily_connections: HashMap<u32, u64>,
412    pub error_rate: f64,
413}
414
415/// Peak usage analysis
416#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct PeakUsageAnalysis {
418    pub peak_hours: Vec<u32>,
419    pub busiest_hour: Option<u32>,
420    pub max_queries_per_hour: u64,
421    pub max_duration_per_hour: f64,
422    pub average_queries_per_hour: u64,
423}
424
425impl Default for PeakUsageAnalysis {
426    fn default() -> Self {
427        Self {
428            peak_hours: Vec::new(),
429            busiest_hour: None,
430            max_queries_per_hour: 0,
431            max_duration_per_hour: 0.0,
432            average_queries_per_hour: 0,
433        }
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440    use crate::LogLevel;
441
442    fn create_test_entry(
443        timestamp: DateTime<Utc>,
444        message_type: LogLevel,
445        duration: Option<f64>,
446        message: &str,
447    ) -> LogEntry {
448        LogEntry {
449            timestamp,
450            process_id: "12345".to_string(),
451            user: Some("test_user".to_string()),
452            database: Some("testdb".to_string()),
453            client_host: None,
454            application_name: Some("psql".to_string()),
455            message_type,
456            message: message.to_string(),
457            queries: None,
458            duration,
459        }
460    }
461
462    #[test]
463    fn test_analyze_timing_empty_entries() {
464        let analyzer = TimingAnalyzer::new();
465        let result = analyzer.analyze_timing(&[]).unwrap();
466
467        assert_eq!(result.total_queries, 0);
468        assert_eq!(result.total_duration, 0.0);
469        assert!(result.hourly_patterns.is_empty());
470    }
471
472    #[test]
473    fn test_analyze_timing_with_entries() {
474        let analyzer = TimingAnalyzer::new();
475        let now = Utc::now();
476
477        let entries = vec![
478            create_test_entry(now, LogLevel::Statement, Some(100.0), "statement: SELECT 1"),
479            create_test_entry(now, LogLevel::Statement, Some(200.0), "statement: SELECT 2"),
480            create_test_entry(now, LogLevel::Statement, Some(300.0), "statement: SELECT 3"),
481        ];
482
483        let result = analyzer.analyze_timing(&entries).unwrap();
484
485        assert_eq!(result.total_queries, 3);
486        assert_eq!(result.total_duration, 600.0);
487        assert_eq!(result.average_response_time.num_milliseconds(), 200);
488    }
489
490    #[test]
491    fn test_analyze_timing_events_matches_log_entry_analysis() {
492        let analyzer = TimingAnalyzer::new();
493        let now = Utc::now();
494
495        let entries = vec![
496            create_test_entry(now, LogLevel::Statement, Some(100.0), "statement: SELECT 1"),
497            create_test_entry(
498                now + Duration::seconds(1),
499                LogLevel::Statement,
500                Some(250.0),
501                "statement: SELECT 2",
502            ),
503            create_test_entry(
504                now + Duration::seconds(2),
505                LogLevel::Log,
506                None,
507                "connection received",
508            ),
509        ];
510
511        let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
512        let entry_result = analyzer.analyze_timing(&entries).unwrap();
513        let event_result = analyzer.analyze_timing_events(&events).unwrap();
514
515        assert_eq!(event_result.total_queries, entry_result.total_queries);
516        assert_eq!(event_result.total_duration, entry_result.total_duration);
517        assert_eq!(
518            event_result.average_response_time,
519            entry_result.average_response_time
520        );
521        assert_eq!(event_result.hourly_patterns, entry_result.hourly_patterns);
522        assert_eq!(
523            event_result.connection_patterns,
524            entry_result.connection_patterns
525        );
526    }
527
528    #[test]
529    fn test_calculate_percentiles() {
530        let analyzer = TimingAnalyzer::new();
531        let response_times = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
532        let percentiles = vec![0.5, 0.95, 0.99];
533
534        let result = analyzer
535            .calculate_percentiles(&response_times, &percentiles)
536            .unwrap();
537
538        assert_eq!(result.len(), 3);
539        assert_eq!(result[0], (0.5, 60.0)); // median (5th element in 0-indexed array)
540        assert_eq!(result[1], (0.95, 100.0)); // p95 (9th element in 0-indexed array)
541        assert_eq!(result[2], (0.99, 100.0)); // p99 (9th element in 0-indexed array)
542    }
543
544    #[test]
545    fn test_analyze_connection_patterns() {
546        let analyzer = TimingAnalyzer::new();
547        let now = Utc::now();
548
549        let entries = vec![
550            create_test_entry(now, LogLevel::Log, None, "connection received"),
551            create_test_entry(now, LogLevel::Log, None, "connection established"),
552            create_test_entry(now, LogLevel::Error, None, "connection failed"),
553        ];
554
555        let result = analyzer.analyze_connection_patterns(&entries).unwrap();
556
557        assert_eq!(result.total_connections, 3);
558        assert_eq!(result.connection_errors, 1);
559        assert_eq!(result.error_rate, 1.0 / 3.0);
560    }
561
562    #[test]
563    fn test_peak_usage_analysis() {
564        let analyzer = TimingAnalyzer::new();
565        let now = Utc::now();
566
567        // Create entries with varying query counts per hour
568        let mut entries = Vec::new();
569
570        // Hour 10: 5 queries
571        for i in 0..5 {
572            let timestamp = (now + Duration::hours(10))
573                .with_nanosecond(i * 1_000_000)
574                .unwrap();
575            entries.push(create_test_entry(
576                timestamp,
577                LogLevel::Statement,
578                Some(100.0),
579                "statement: SELECT 1",
580            ));
581        }
582
583        // Hour 11: 10 queries (peak)
584        for i in 0..10 {
585            let timestamp = (now + Duration::hours(11))
586                .with_nanosecond(i * 1_000_000)
587                .unwrap();
588            entries.push(create_test_entry(
589                timestamp,
590                LogLevel::Statement,
591                Some(100.0),
592                "statement: SELECT 1",
593            ));
594        }
595
596        // Hour 12: 3 queries
597        for i in 0..3 {
598            let timestamp = (now + Duration::hours(10))
599                .with_nanosecond(i * 1_000_000)
600                .unwrap();
601            entries.push(create_test_entry(
602                timestamp,
603                LogLevel::Statement,
604                Some(100.0),
605                "statement: SELECT 1",
606            ));
607        }
608
609        let result = analyzer.get_peak_usage_analysis(&entries).unwrap();
610
611        assert_eq!(result.max_queries_per_hour, 10);
612        // The busiest hour depends on the current time, so we'll just check it's one of the expected hours
613        assert!(result.busiest_hour.is_some());
614        assert!(!result.peak_hours.is_empty());
615    }
616
617    #[test]
618    fn test_invalid_percentile() {
619        let analyzer = TimingAnalyzer::new();
620        let response_times = vec![10.0, 20.0, 30.0];
621        let percentiles = vec![1.5]; // Invalid percentile > 1.0
622
623        let result = analyzer.calculate_percentiles(&response_times, &percentiles);
624        assert!(result.is_err());
625    }
626}