mockforge_analytics/
queries.rs

1//! High-level query API for analytics data
2
3use crate::database::AnalyticsDatabase;
4use crate::error::Result;
5use crate::models::*;
6use chrono::{DateTime, Utc};
7use sqlx::Row;
8
9impl AnalyticsDatabase {
10    /// Get overview metrics for the dashboard
11    pub async fn get_overview_metrics(&self, duration_seconds: i64) -> Result<OverviewMetrics> {
12        let end_time = Utc::now().timestamp();
13        let start_time = end_time - duration_seconds;
14
15        let filter = AnalyticsFilter {
16            start_time: Some(start_time),
17            end_time: Some(end_time),
18            ..Default::default()
19        };
20
21        let aggregates = self.get_minute_aggregates(&filter).await?;
22
23        let total_requests: i64 = aggregates.iter().map(|a| a.request_count).sum();
24        let total_errors: i64 = aggregates.iter().map(|a| a.error_count).sum();
25        let error_rate = if total_requests > 0 {
26            (total_errors as f64 / total_requests as f64) * 100.0
27        } else {
28            0.0
29        };
30
31        let total_latency: f64 = aggregates.iter().map(|a| a.latency_sum).sum();
32        let latency_count: i64 = aggregates.iter().filter(|a| a.latency_sum > 0.0).count() as i64;
33        let avg_latency_ms = if latency_count > 0 {
34            total_latency / latency_count as f64
35        } else {
36            0.0
37        };
38
39        let p95_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p95).collect();
40        let p95_latency_ms = if !p95_latencies.is_empty() {
41            p95_latencies.iter().sum::<f64>() / p95_latencies.len() as f64
42        } else {
43            0.0
44        };
45
46        let p99_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p99).collect();
47        let p99_latency_ms = if !p99_latencies.is_empty() {
48            p99_latencies.iter().sum::<f64>() / p99_latencies.len() as f64
49        } else {
50            0.0
51        };
52
53        let total_bytes_sent: i64 = aggregates.iter().map(|a| a.bytes_sent).sum();
54        let total_bytes_received: i64 = aggregates.iter().map(|a| a.bytes_received).sum();
55
56        let active_connections =
57            aggregates.iter().filter_map(|a| a.active_connections).max().unwrap_or(0);
58
59        let requests_per_second = total_requests as f64 / duration_seconds as f64;
60
61        // Get top protocols
62        let top_protocols = self.get_top_protocols(5, None).await?;
63
64        // Get top endpoints
65        let top_endpoints_data = self.get_top_endpoints(10, None).await?;
66        let top_endpoints: Vec<EndpointStat> = top_endpoints_data
67            .iter()
68            .map(|e| {
69                let error_rate = if e.total_requests > 0 {
70                    (e.total_errors as f64 / e.total_requests as f64) * 100.0
71                } else {
72                    0.0
73                };
74                EndpointStat {
75                    endpoint: e.endpoint.clone(),
76                    protocol: e.protocol.clone(),
77                    method: e.method.clone(),
78                    request_count: e.total_requests,
79                    error_count: e.total_errors,
80                    error_rate,
81                    avg_latency_ms: e.avg_latency_ms.unwrap_or(0.0),
82                    p95_latency_ms: e.p95_latency_ms.unwrap_or(0.0),
83                }
84            })
85            .collect();
86
87        Ok(OverviewMetrics {
88            total_requests,
89            total_errors,
90            error_rate,
91            avg_latency_ms,
92            p95_latency_ms,
93            p99_latency_ms,
94            active_connections,
95            total_bytes_sent,
96            total_bytes_received,
97            requests_per_second,
98            top_protocols,
99            top_endpoints,
100        })
101    }
102
103    /// Get top protocols by request count
104    pub async fn get_top_protocols(
105        &self,
106        limit: i64,
107        workspace_id: Option<&str>,
108    ) -> Result<Vec<ProtocolStat>> {
109        let mut query = String::from(
110            r"
111            SELECT
112                protocol,
113                SUM(request_count) as total_requests,
114                SUM(error_count) as total_errors,
115                AVG(latency_sum / NULLIF(request_count, 0)) as avg_latency_ms
116            FROM metrics_aggregates_minute
117            WHERE 1=1
118            ",
119        );
120
121        if workspace_id.is_some() {
122            query.push_str(" AND workspace_id = ?");
123        }
124
125        query.push_str(
126            "
127            GROUP BY protocol
128            ORDER BY total_requests DESC
129            LIMIT ?
130            ",
131        );
132
133        let mut sql_query = sqlx::query(&query);
134
135        if let Some(workspace) = workspace_id {
136            sql_query = sql_query.bind(workspace);
137        }
138
139        sql_query = sql_query.bind(limit);
140
141        let rows = sql_query.fetch_all(self.pool()).await?;
142
143        let mut protocols = Vec::new();
144        for row in rows {
145            protocols.push(ProtocolStat {
146                protocol: row.get("protocol"),
147                request_count: row.get("total_requests"),
148                error_count: row.get("total_errors"),
149                avg_latency_ms: row.try_get("avg_latency_ms").unwrap_or(0.0),
150            });
151        }
152
153        Ok(protocols)
154    }
155
156    /// Get request count time series
157    pub async fn get_request_time_series(
158        &self,
159        filter: &AnalyticsFilter,
160        granularity: Granularity,
161    ) -> Result<Vec<TimeSeries>> {
162        let aggregates = self.get_minute_aggregates(filter).await?;
163
164        let bucket_size = match granularity {
165            Granularity::Minute => 60,
166            Granularity::Hour => 3600,
167            Granularity::Day => 86400,
168        };
169
170        // Group by protocol and time bucket
171        let mut series_map: std::collections::HashMap<String, Vec<TimeSeriesPoint>> =
172            std::collections::HashMap::new();
173
174        for agg in aggregates {
175            let bucket = (agg.timestamp / bucket_size) * bucket_size;
176            let point = TimeSeriesPoint {
177                timestamp: bucket,
178                value: agg.request_count as f64,
179            };
180
181            series_map.entry(agg.protocol.clone()).or_default().push(point);
182        }
183
184        // Convert to TimeSeries objects
185        let mut result: Vec<TimeSeries> = series_map
186            .into_iter()
187            .map(|(protocol, mut points)| {
188                points.sort_by_key(|p| p.timestamp);
189
190                // Aggregate points in the same bucket
191                let mut aggregated = Vec::new();
192                let mut current_bucket = None;
193                let mut current_sum = 0.0;
194
195                for point in points {
196                    match current_bucket {
197                        Some(bucket) if bucket == point.timestamp => {
198                            current_sum += point.value;
199                        }
200                        _ => {
201                            if let Some(bucket) = current_bucket {
202                                aggregated.push(TimeSeriesPoint {
203                                    timestamp: bucket,
204                                    value: current_sum,
205                                });
206                            }
207                            current_bucket = Some(point.timestamp);
208                            current_sum = point.value;
209                        }
210                    }
211                }
212
213                if let Some(bucket) = current_bucket {
214                    aggregated.push(TimeSeriesPoint {
215                        timestamp: bucket,
216                        value: current_sum,
217                    });
218                }
219
220                TimeSeries {
221                    label: protocol,
222                    data: aggregated,
223                }
224            })
225            .collect();
226
227        result.sort_by(|a, b| b.data.len().cmp(&a.data.len()));
228        Ok(result)
229    }
230
231    /// Get latency trends
232    pub async fn get_latency_trends(&self, filter: &AnalyticsFilter) -> Result<Vec<LatencyTrend>> {
233        let aggregates = self.get_minute_aggregates(filter).await?;
234
235        let mut trends = Vec::new();
236
237        // Group by timestamp and aggregate
238        let mut bucket_map: std::collections::HashMap<i64, Vec<&MetricsAggregate>> =
239            std::collections::HashMap::new();
240
241        for agg in &aggregates {
242            bucket_map.entry(agg.timestamp).or_default().push(agg);
243        }
244
245        for (timestamp, group) in bucket_map {
246            let avg = group
247                .iter()
248                .filter_map(|a| {
249                    if a.request_count > 0 {
250                        Some(a.latency_sum / a.request_count as f64)
251                    } else {
252                        None
253                    }
254                })
255                .sum::<f64>()
256                / group.len() as f64;
257
258            let min = group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
259            let max = group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
260            let p50 = group.iter().filter_map(|a| a.latency_p50).sum::<f64>() / group.len() as f64;
261            let p95 = group.iter().filter_map(|a| a.latency_p95).sum::<f64>() / group.len() as f64;
262            let p99 = group.iter().filter_map(|a| a.latency_p99).sum::<f64>() / group.len() as f64;
263
264            trends.push(LatencyTrend {
265                timestamp,
266                p50,
267                p95,
268                p99,
269                avg,
270                min: if min.is_finite() { min } else { 0.0 },
271                max: if max.is_finite() { max } else { 0.0 },
272            });
273        }
274
275        trends.sort_by_key(|t| t.timestamp);
276        Ok(trends)
277    }
278
279    /// Get error summary
280    pub async fn get_error_summary(
281        &self,
282        filter: &AnalyticsFilter,
283        limit: i64,
284    ) -> Result<Vec<ErrorSummary>> {
285        let errors = self.get_recent_errors(1000, filter).await?;
286
287        // Group by error type
288        let mut error_map: std::collections::HashMap<
289            String,
290            (i64, std::collections::HashSet<String>, i64),
291        > = std::collections::HashMap::new();
292
293        for error in errors {
294            let error_type = error.error_type.clone().unwrap_or_else(|| "unknown".to_string());
295            let error_category =
296                error.error_category.clone().unwrap_or_else(|| "other".to_string());
297            let endpoint = error.endpoint.clone().unwrap_or_default();
298
299            let entry = error_map.entry(format!("{}:{}", error_category, error_type)).or_insert((
300                0,
301                std::collections::HashSet::new(),
302                0,
303            ));
304
305            entry.0 += 1;
306            entry.1.insert(endpoint);
307            entry.2 = entry.2.max(error.timestamp);
308        }
309
310        let mut summaries: Vec<ErrorSummary> = error_map
311            .into_iter()
312            .map(|(key, (count, endpoints, last_ts))| {
313                let parts: Vec<&str> = key.split(':').collect();
314                ErrorSummary {
315                    error_type: parts.get(1).unwrap_or(&"unknown").to_string(),
316                    error_category: parts.first().unwrap_or(&"other").to_string(),
317                    count,
318                    endpoints: endpoints.into_iter().collect(),
319                    last_occurrence: DateTime::from_timestamp(last_ts, 0)
320                        .unwrap_or_else(|| Utc::now()),
321                }
322            })
323            .collect();
324
325        summaries.sort_by(|a, b| b.count.cmp(&a.count));
326        summaries.truncate(limit as usize);
327
328        Ok(summaries)
329    }
330}