1use crate::database::AnalyticsDatabase;
4use crate::error::Result;
5use crate::models::{
6 AnalyticsFilter, EndpointStat, ErrorSummary, Granularity, LatencyTrend, MetricsAggregate,
7 OverviewMetrics, ProtocolStat, TimeSeries, TimeSeriesPoint,
8};
9use chrono::{DateTime, Utc};
10use sqlx::Row;
11
12impl AnalyticsDatabase {
13 pub async fn get_overview_metrics(&self, duration_seconds: i64) -> Result<OverviewMetrics> {
15 let end_time = Utc::now().timestamp();
16 let start_time = end_time - duration_seconds;
17
18 let filter = AnalyticsFilter {
19 start_time: Some(start_time),
20 end_time: Some(end_time),
21 ..Default::default()
22 };
23
24 let aggregates = self.get_minute_aggregates(&filter).await?;
25
26 let total_requests: i64 = aggregates.iter().map(|a| a.request_count).sum();
27 let total_errors: i64 = aggregates.iter().map(|a| a.error_count).sum();
28 let error_rate = if total_requests > 0 {
29 (total_errors as f64 / total_requests as f64) * 100.0
30 } else {
31 0.0
32 };
33
34 let total_latency: f64 = aggregates.iter().map(|a| a.latency_sum).sum();
35 let latency_count: i64 = aggregates.iter().filter(|a| a.latency_sum > 0.0).count() as i64;
36 let avg_latency_ms = if latency_count > 0 {
37 total_latency / latency_count as f64
38 } else {
39 0.0
40 };
41
42 let p95_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p95).collect();
43 let p95_latency_ms = if p95_latencies.is_empty() {
44 0.0
45 } else {
46 p95_latencies.iter().sum::<f64>() / p95_latencies.len() as f64
47 };
48
49 let p99_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p99).collect();
50 let p99_latency_ms = if p99_latencies.is_empty() {
51 0.0
52 } else {
53 p99_latencies.iter().sum::<f64>() / p99_latencies.len() as f64
54 };
55
56 let total_bytes_sent: i64 = aggregates.iter().map(|a| a.bytes_sent).sum();
57 let total_bytes_received: i64 = aggregates.iter().map(|a| a.bytes_received).sum();
58
59 let active_connections =
60 aggregates.iter().filter_map(|a| a.active_connections).max().unwrap_or(0);
61
62 let requests_per_second = total_requests as f64 / duration_seconds as f64;
63
64 let top_protocols = self.get_top_protocols(5, None).await?;
66
67 let top_endpoints_data = self.get_top_endpoints(10, None).await?;
69 let top_endpoints: Vec<EndpointStat> = top_endpoints_data
70 .iter()
71 .map(|e| {
72 let error_rate = if e.total_requests > 0 {
73 (e.total_errors as f64 / e.total_requests as f64) * 100.0
74 } else {
75 0.0
76 };
77 EndpointStat {
78 endpoint: e.endpoint.clone(),
79 protocol: e.protocol.clone(),
80 method: e.method.clone(),
81 request_count: e.total_requests,
82 error_count: e.total_errors,
83 error_rate,
84 avg_latency_ms: e.avg_latency_ms.unwrap_or(0.0),
85 p95_latency_ms: e.p95_latency_ms.unwrap_or(0.0),
86 }
87 })
88 .collect();
89
90 Ok(OverviewMetrics {
91 total_requests,
92 total_errors,
93 error_rate,
94 avg_latency_ms,
95 p95_latency_ms,
96 p99_latency_ms,
97 active_connections,
98 total_bytes_sent,
99 total_bytes_received,
100 requests_per_second,
101 top_protocols,
102 top_endpoints,
103 })
104 }
105
106 pub async fn get_top_protocols(
108 &self,
109 limit: i64,
110 workspace_id: Option<&str>,
111 ) -> Result<Vec<ProtocolStat>> {
112 let mut query = String::from(
113 r"
114 SELECT
115 protocol,
116 SUM(request_count) as total_requests,
117 SUM(error_count) as total_errors,
118 AVG(latency_sum / NULLIF(request_count, 0)) as avg_latency_ms
119 FROM metrics_aggregates_minute
120 WHERE 1=1
121 ",
122 );
123
124 if workspace_id.is_some() {
125 query.push_str(" AND workspace_id = ?");
126 }
127
128 query.push_str(
129 "
130 GROUP BY protocol
131 ORDER BY total_requests DESC
132 LIMIT ?
133 ",
134 );
135
136 let mut sql_query = sqlx::query(&query);
137
138 if let Some(workspace) = workspace_id {
139 sql_query = sql_query.bind(workspace);
140 }
141
142 sql_query = sql_query.bind(limit);
143
144 let rows = sql_query.fetch_all(self.pool()).await?;
145
146 let mut protocols = Vec::new();
147 for row in rows {
148 protocols.push(ProtocolStat {
149 protocol: row.get("protocol"),
150 request_count: row.get("total_requests"),
151 error_count: row.get("total_errors"),
152 avg_latency_ms: row.try_get("avg_latency_ms").unwrap_or(0.0),
153 });
154 }
155
156 Ok(protocols)
157 }
158
159 pub async fn get_request_time_series(
161 &self,
162 filter: &AnalyticsFilter,
163 granularity: Granularity,
164 ) -> Result<Vec<TimeSeries>> {
165 let aggregates = self.get_minute_aggregates(filter).await?;
166
167 let bucket_size = match granularity {
168 Granularity::Minute => 60,
169 Granularity::Hour => 3600,
170 Granularity::Day => 86400,
171 };
172
173 let mut series_map: std::collections::HashMap<String, Vec<TimeSeriesPoint>> =
175 std::collections::HashMap::new();
176
177 for agg in aggregates {
178 let bucket = (agg.timestamp / bucket_size) * bucket_size;
179 let point = TimeSeriesPoint {
180 timestamp: bucket,
181 value: agg.request_count as f64,
182 };
183
184 series_map.entry(agg.protocol.clone()).or_default().push(point);
185 }
186
187 let mut result: Vec<TimeSeries> = series_map
189 .into_iter()
190 .map(|(protocol, mut points)| {
191 points.sort_by_key(|p| p.timestamp);
192
193 let mut aggregated = Vec::new();
195 let mut current_bucket = None;
196 let mut current_sum = 0.0;
197
198 for point in points {
199 match current_bucket {
200 Some(bucket) if bucket == point.timestamp => {
201 current_sum += point.value;
202 }
203 _ => {
204 if let Some(bucket) = current_bucket {
205 aggregated.push(TimeSeriesPoint {
206 timestamp: bucket,
207 value: current_sum,
208 });
209 }
210 current_bucket = Some(point.timestamp);
211 current_sum = point.value;
212 }
213 }
214 }
215
216 if let Some(bucket) = current_bucket {
217 aggregated.push(TimeSeriesPoint {
218 timestamp: bucket,
219 value: current_sum,
220 });
221 }
222
223 TimeSeries {
224 label: protocol,
225 data: aggregated,
226 }
227 })
228 .collect();
229
230 result.sort_by(|a, b| b.data.len().cmp(&a.data.len()));
231 Ok(result)
232 }
233
234 pub async fn get_latency_trends(&self, filter: &AnalyticsFilter) -> Result<Vec<LatencyTrend>> {
236 let aggregates = self.get_minute_aggregates(filter).await?;
237
238 let mut trends = Vec::new();
239
240 let mut bucket_map: std::collections::HashMap<i64, Vec<&MetricsAggregate>> =
242 std::collections::HashMap::new();
243
244 for agg in &aggregates {
245 bucket_map.entry(agg.timestamp).or_default().push(agg);
246 }
247
248 for (timestamp, group) in bucket_map {
249 let avg = group
250 .iter()
251 .filter_map(|a| {
252 if a.request_count > 0 {
253 Some(a.latency_sum / a.request_count as f64)
254 } else {
255 None
256 }
257 })
258 .sum::<f64>()
259 / group.len() as f64;
260
261 let min = group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
262 let max = group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
263 let p50 = group.iter().filter_map(|a| a.latency_p50).sum::<f64>() / group.len() as f64;
264 let p95 = group.iter().filter_map(|a| a.latency_p95).sum::<f64>() / group.len() as f64;
265 let p99 = group.iter().filter_map(|a| a.latency_p99).sum::<f64>() / group.len() as f64;
266
267 trends.push(LatencyTrend {
268 timestamp,
269 p50,
270 p95,
271 p99,
272 avg,
273 min: if min.is_finite() { min } else { 0.0 },
274 max: if max.is_finite() { max } else { 0.0 },
275 });
276 }
277
278 trends.sort_by_key(|t| t.timestamp);
279 Ok(trends)
280 }
281
282 pub async fn get_error_summary(
284 &self,
285 filter: &AnalyticsFilter,
286 limit: i64,
287 ) -> Result<Vec<ErrorSummary>> {
288 let errors = self.get_recent_errors(1000, filter).await?;
289
290 let mut error_map: std::collections::HashMap<
292 String,
293 (i64, std::collections::HashSet<String>, i64),
294 > = std::collections::HashMap::new();
295
296 for error in errors {
297 let error_type = error.error_type.clone().unwrap_or_else(|| "unknown".to_string());
298 let error_category =
299 error.error_category.clone().unwrap_or_else(|| "other".to_string());
300 let endpoint = error.endpoint.clone().unwrap_or_default();
301
302 let entry = error_map.entry(format!("{error_category}:{error_type}")).or_insert((
303 0,
304 std::collections::HashSet::new(),
305 0,
306 ));
307
308 entry.0 += 1;
309 entry.1.insert(endpoint);
310 entry.2 = entry.2.max(error.timestamp);
311 }
312
313 let mut summaries: Vec<ErrorSummary> = error_map
314 .into_iter()
315 .map(|(key, (count, endpoints, last_ts))| {
316 let parts: Vec<&str> = key.split(':').collect();
317 ErrorSummary {
318 error_type: (*parts.get(1).unwrap_or(&"unknown")).to_string(),
319 error_category: (*parts.first().unwrap_or(&"other")).to_string(),
320 count,
321 endpoints: endpoints.into_iter().collect(),
322 last_occurrence: DateTime::from_timestamp(last_ts, 0).unwrap_or_else(Utc::now),
323 }
324 })
325 .collect();
326
327 summaries.sort_by(|a, b| b.count.cmp(&a.count));
328 summaries.truncate(limit as usize);
329
330 Ok(summaries)
331 }
332}