mockforge_analytics/
queries.rs

1//! High-level query API for analytics data
2
3use crate::database::AnalyticsDatabase;
4use crate::error::Result;
5use crate::models::{
6    AnalyticsFilter, EndpointStat, ErrorSummary, Granularity, LatencyTrend, MetricsAggregate,
7    OverviewMetrics, ProtocolStat, TimeSeries, TimeSeriesPoint,
8};
9use chrono::{DateTime, Utc};
10use sqlx::Row;
11
12impl AnalyticsDatabase {
13    /// Get overview metrics for the dashboard
14    pub async fn get_overview_metrics(&self, duration_seconds: i64) -> Result<OverviewMetrics> {
15        let end_time = Utc::now().timestamp();
16        let start_time = end_time - duration_seconds;
17
18        let filter = AnalyticsFilter {
19            start_time: Some(start_time),
20            end_time: Some(end_time),
21            ..Default::default()
22        };
23
24        let aggregates = self.get_minute_aggregates(&filter).await?;
25
26        let total_requests: i64 = aggregates.iter().map(|a| a.request_count).sum();
27        let total_errors: i64 = aggregates.iter().map(|a| a.error_count).sum();
28        let error_rate = if total_requests > 0 {
29            (total_errors as f64 / total_requests as f64) * 100.0
30        } else {
31            0.0
32        };
33
34        let total_latency: f64 = aggregates.iter().map(|a| a.latency_sum).sum();
35        let latency_count: i64 = aggregates.iter().filter(|a| a.latency_sum > 0.0).count() as i64;
36        let avg_latency_ms = if latency_count > 0 {
37            total_latency / latency_count as f64
38        } else {
39            0.0
40        };
41
42        let p95_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p95).collect();
43        let p95_latency_ms = if p95_latencies.is_empty() {
44            0.0
45        } else {
46            p95_latencies.iter().sum::<f64>() / p95_latencies.len() as f64
47        };
48
49        let p99_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p99).collect();
50        let p99_latency_ms = if p99_latencies.is_empty() {
51            0.0
52        } else {
53            p99_latencies.iter().sum::<f64>() / p99_latencies.len() as f64
54        };
55
56        let total_bytes_sent: i64 = aggregates.iter().map(|a| a.bytes_sent).sum();
57        let total_bytes_received: i64 = aggregates.iter().map(|a| a.bytes_received).sum();
58
59        let active_connections =
60            aggregates.iter().filter_map(|a| a.active_connections).max().unwrap_or(0);
61
62        let requests_per_second = total_requests as f64 / duration_seconds as f64;
63
64        // Get top protocols
65        let top_protocols = self.get_top_protocols(5, None).await?;
66
67        // Get top endpoints
68        let top_endpoints_data = self.get_top_endpoints(10, None).await?;
69        let top_endpoints: Vec<EndpointStat> = top_endpoints_data
70            .iter()
71            .map(|e| {
72                let error_rate = if e.total_requests > 0 {
73                    (e.total_errors as f64 / e.total_requests as f64) * 100.0
74                } else {
75                    0.0
76                };
77                EndpointStat {
78                    endpoint: e.endpoint.clone(),
79                    protocol: e.protocol.clone(),
80                    method: e.method.clone(),
81                    request_count: e.total_requests,
82                    error_count: e.total_errors,
83                    error_rate,
84                    avg_latency_ms: e.avg_latency_ms.unwrap_or(0.0),
85                    p95_latency_ms: e.p95_latency_ms.unwrap_or(0.0),
86                }
87            })
88            .collect();
89
90        Ok(OverviewMetrics {
91            total_requests,
92            total_errors,
93            error_rate,
94            avg_latency_ms,
95            p95_latency_ms,
96            p99_latency_ms,
97            active_connections,
98            total_bytes_sent,
99            total_bytes_received,
100            requests_per_second,
101            top_protocols,
102            top_endpoints,
103        })
104    }
105
106    /// Get top protocols by request count
107    pub async fn get_top_protocols(
108        &self,
109        limit: i64,
110        workspace_id: Option<&str>,
111    ) -> Result<Vec<ProtocolStat>> {
112        let mut query = String::from(
113            r"
114            SELECT
115                protocol,
116                SUM(request_count) as total_requests,
117                SUM(error_count) as total_errors,
118                AVG(latency_sum / NULLIF(request_count, 0)) as avg_latency_ms
119            FROM metrics_aggregates_minute
120            WHERE 1=1
121            ",
122        );
123
124        if workspace_id.is_some() {
125            query.push_str(" AND workspace_id = ?");
126        }
127
128        query.push_str(
129            "
130            GROUP BY protocol
131            ORDER BY total_requests DESC
132            LIMIT ?
133            ",
134        );
135
136        let mut sql_query = sqlx::query(&query);
137
138        if let Some(workspace) = workspace_id {
139            sql_query = sql_query.bind(workspace);
140        }
141
142        sql_query = sql_query.bind(limit);
143
144        let rows = sql_query.fetch_all(self.pool()).await?;
145
146        let mut protocols = Vec::new();
147        for row in rows {
148            protocols.push(ProtocolStat {
149                protocol: row.get("protocol"),
150                request_count: row.get("total_requests"),
151                error_count: row.get("total_errors"),
152                avg_latency_ms: row.try_get("avg_latency_ms").unwrap_or(0.0),
153            });
154        }
155
156        Ok(protocols)
157    }
158
159    /// Get request count time series
160    pub async fn get_request_time_series(
161        &self,
162        filter: &AnalyticsFilter,
163        granularity: Granularity,
164    ) -> Result<Vec<TimeSeries>> {
165        let aggregates = self.get_minute_aggregates(filter).await?;
166
167        let bucket_size = match granularity {
168            Granularity::Minute => 60,
169            Granularity::Hour => 3600,
170            Granularity::Day => 86400,
171        };
172
173        // Group by protocol and time bucket
174        let mut series_map: std::collections::HashMap<String, Vec<TimeSeriesPoint>> =
175            std::collections::HashMap::new();
176
177        for agg in aggregates {
178            let bucket = (agg.timestamp / bucket_size) * bucket_size;
179            let point = TimeSeriesPoint {
180                timestamp: bucket,
181                value: agg.request_count as f64,
182            };
183
184            series_map.entry(agg.protocol.clone()).or_default().push(point);
185        }
186
187        // Convert to TimeSeries objects
188        let mut result: Vec<TimeSeries> = series_map
189            .into_iter()
190            .map(|(protocol, mut points)| {
191                points.sort_by_key(|p| p.timestamp);
192
193                // Aggregate points in the same bucket
194                let mut aggregated = Vec::new();
195                let mut current_bucket = None;
196                let mut current_sum = 0.0;
197
198                for point in points {
199                    match current_bucket {
200                        Some(bucket) if bucket == point.timestamp => {
201                            current_sum += point.value;
202                        }
203                        _ => {
204                            if let Some(bucket) = current_bucket {
205                                aggregated.push(TimeSeriesPoint {
206                                    timestamp: bucket,
207                                    value: current_sum,
208                                });
209                            }
210                            current_bucket = Some(point.timestamp);
211                            current_sum = point.value;
212                        }
213                    }
214                }
215
216                if let Some(bucket) = current_bucket {
217                    aggregated.push(TimeSeriesPoint {
218                        timestamp: bucket,
219                        value: current_sum,
220                    });
221                }
222
223                TimeSeries {
224                    label: protocol,
225                    data: aggregated,
226                }
227            })
228            .collect();
229
230        result.sort_by(|a, b| b.data.len().cmp(&a.data.len()));
231        Ok(result)
232    }
233
234    /// Get latency trends
235    pub async fn get_latency_trends(&self, filter: &AnalyticsFilter) -> Result<Vec<LatencyTrend>> {
236        let aggregates = self.get_minute_aggregates(filter).await?;
237
238        let mut trends = Vec::new();
239
240        // Group by timestamp and aggregate
241        let mut bucket_map: std::collections::HashMap<i64, Vec<&MetricsAggregate>> =
242            std::collections::HashMap::new();
243
244        for agg in &aggregates {
245            bucket_map.entry(agg.timestamp).or_default().push(agg);
246        }
247
248        for (timestamp, group) in bucket_map {
249            let avg = group
250                .iter()
251                .filter_map(|a| {
252                    if a.request_count > 0 {
253                        Some(a.latency_sum / a.request_count as f64)
254                    } else {
255                        None
256                    }
257                })
258                .sum::<f64>()
259                / group.len() as f64;
260
261            let min = group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
262            let max = group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
263            let p50 = group.iter().filter_map(|a| a.latency_p50).sum::<f64>() / group.len() as f64;
264            let p95 = group.iter().filter_map(|a| a.latency_p95).sum::<f64>() / group.len() as f64;
265            let p99 = group.iter().filter_map(|a| a.latency_p99).sum::<f64>() / group.len() as f64;
266
267            trends.push(LatencyTrend {
268                timestamp,
269                p50,
270                p95,
271                p99,
272                avg,
273                min: if min.is_finite() { min } else { 0.0 },
274                max: if max.is_finite() { max } else { 0.0 },
275            });
276        }
277
278        trends.sort_by_key(|t| t.timestamp);
279        Ok(trends)
280    }
281
282    /// Get error summary
283    pub async fn get_error_summary(
284        &self,
285        filter: &AnalyticsFilter,
286        limit: i64,
287    ) -> Result<Vec<ErrorSummary>> {
288        let errors = self.get_recent_errors(1000, filter).await?;
289
290        // Group by error type
291        let mut error_map: std::collections::HashMap<
292            String,
293            (i64, std::collections::HashSet<String>, i64),
294        > = std::collections::HashMap::new();
295
296        for error in errors {
297            let error_type = error.error_type.clone().unwrap_or_else(|| "unknown".to_string());
298            let error_category =
299                error.error_category.clone().unwrap_or_else(|| "other".to_string());
300            let endpoint = error.endpoint.clone().unwrap_or_default();
301
302            let entry = error_map.entry(format!("{error_category}:{error_type}")).or_insert((
303                0,
304                std::collections::HashSet::new(),
305                0,
306            ));
307
308            entry.0 += 1;
309            entry.1.insert(endpoint);
310            entry.2 = entry.2.max(error.timestamp);
311        }
312
313        let mut summaries: Vec<ErrorSummary> = error_map
314            .into_iter()
315            .map(|(key, (count, endpoints, last_ts))| {
316                let parts: Vec<&str> = key.split(':').collect();
317                ErrorSummary {
318                    error_type: (*parts.get(1).unwrap_or(&"unknown")).to_string(),
319                    error_category: (*parts.first().unwrap_or(&"other")).to_string(),
320                    count,
321                    endpoints: endpoints.into_iter().collect(),
322                    last_occurrence: DateTime::from_timestamp(last_ts, 0).unwrap_or_else(Utc::now),
323                }
324            })
325            .collect();
326
327        summaries.sort_by(|a, b| b.count.cmp(&a.count));
328        summaries.truncate(limit as usize);
329
330        Ok(summaries)
331    }
332}