1use crate::database::AnalyticsDatabase;
4use crate::error::Result;
5use crate::models::*;
6use chrono::{DateTime, Utc};
7use sqlx::Row;
8
9impl AnalyticsDatabase {
10 pub async fn get_overview_metrics(&self, duration_seconds: i64) -> Result<OverviewMetrics> {
12 let end_time = Utc::now().timestamp();
13 let start_time = end_time - duration_seconds;
14
15 let filter = AnalyticsFilter {
16 start_time: Some(start_time),
17 end_time: Some(end_time),
18 ..Default::default()
19 };
20
21 let aggregates = self.get_minute_aggregates(&filter).await?;
22
23 let total_requests: i64 = aggregates.iter().map(|a| a.request_count).sum();
24 let total_errors: i64 = aggregates.iter().map(|a| a.error_count).sum();
25 let error_rate = if total_requests > 0 {
26 (total_errors as f64 / total_requests as f64) * 100.0
27 } else {
28 0.0
29 };
30
31 let total_latency: f64 = aggregates.iter().map(|a| a.latency_sum).sum();
32 let latency_count: i64 = aggregates.iter().filter(|a| a.latency_sum > 0.0).count() as i64;
33 let avg_latency_ms = if latency_count > 0 {
34 total_latency / latency_count as f64
35 } else {
36 0.0
37 };
38
39 let p95_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p95).collect();
40 let p95_latency_ms = if !p95_latencies.is_empty() {
41 p95_latencies.iter().sum::<f64>() / p95_latencies.len() as f64
42 } else {
43 0.0
44 };
45
46 let p99_latencies: Vec<f64> = aggregates.iter().filter_map(|a| a.latency_p99).collect();
47 let p99_latency_ms = if !p99_latencies.is_empty() {
48 p99_latencies.iter().sum::<f64>() / p99_latencies.len() as f64
49 } else {
50 0.0
51 };
52
53 let total_bytes_sent: i64 = aggregates.iter().map(|a| a.bytes_sent).sum();
54 let total_bytes_received: i64 = aggregates.iter().map(|a| a.bytes_received).sum();
55
56 let active_connections =
57 aggregates.iter().filter_map(|a| a.active_connections).max().unwrap_or(0);
58
59 let requests_per_second = total_requests as f64 / duration_seconds as f64;
60
61 let top_protocols = self.get_top_protocols(5, None).await?;
63
64 let top_endpoints_data = self.get_top_endpoints(10, None).await?;
66 let top_endpoints: Vec<EndpointStat> = top_endpoints_data
67 .iter()
68 .map(|e| {
69 let error_rate = if e.total_requests > 0 {
70 (e.total_errors as f64 / e.total_requests as f64) * 100.0
71 } else {
72 0.0
73 };
74 EndpointStat {
75 endpoint: e.endpoint.clone(),
76 protocol: e.protocol.clone(),
77 method: e.method.clone(),
78 request_count: e.total_requests,
79 error_count: e.total_errors,
80 error_rate,
81 avg_latency_ms: e.avg_latency_ms.unwrap_or(0.0),
82 p95_latency_ms: e.p95_latency_ms.unwrap_or(0.0),
83 }
84 })
85 .collect();
86
87 Ok(OverviewMetrics {
88 total_requests,
89 total_errors,
90 error_rate,
91 avg_latency_ms,
92 p95_latency_ms,
93 p99_latency_ms,
94 active_connections,
95 total_bytes_sent,
96 total_bytes_received,
97 requests_per_second,
98 top_protocols,
99 top_endpoints,
100 })
101 }
102
103 pub async fn get_top_protocols(
105 &self,
106 limit: i64,
107 workspace_id: Option<&str>,
108 ) -> Result<Vec<ProtocolStat>> {
109 let mut query = String::from(
110 r"
111 SELECT
112 protocol,
113 SUM(request_count) as total_requests,
114 SUM(error_count) as total_errors,
115 AVG(latency_sum / NULLIF(request_count, 0)) as avg_latency_ms
116 FROM metrics_aggregates_minute
117 WHERE 1=1
118 ",
119 );
120
121 if workspace_id.is_some() {
122 query.push_str(" AND workspace_id = ?");
123 }
124
125 query.push_str(
126 "
127 GROUP BY protocol
128 ORDER BY total_requests DESC
129 LIMIT ?
130 ",
131 );
132
133 let mut sql_query = sqlx::query(&query);
134
135 if let Some(workspace) = workspace_id {
136 sql_query = sql_query.bind(workspace);
137 }
138
139 sql_query = sql_query.bind(limit);
140
141 let rows = sql_query.fetch_all(self.pool()).await?;
142
143 let mut protocols = Vec::new();
144 for row in rows {
145 protocols.push(ProtocolStat {
146 protocol: row.get("protocol"),
147 request_count: row.get("total_requests"),
148 error_count: row.get("total_errors"),
149 avg_latency_ms: row.try_get("avg_latency_ms").unwrap_or(0.0),
150 });
151 }
152
153 Ok(protocols)
154 }
155
156 pub async fn get_request_time_series(
158 &self,
159 filter: &AnalyticsFilter,
160 granularity: Granularity,
161 ) -> Result<Vec<TimeSeries>> {
162 let aggregates = self.get_minute_aggregates(filter).await?;
163
164 let bucket_size = match granularity {
165 Granularity::Minute => 60,
166 Granularity::Hour => 3600,
167 Granularity::Day => 86400,
168 };
169
170 let mut series_map: std::collections::HashMap<String, Vec<TimeSeriesPoint>> =
172 std::collections::HashMap::new();
173
174 for agg in aggregates {
175 let bucket = (agg.timestamp / bucket_size) * bucket_size;
176 let point = TimeSeriesPoint {
177 timestamp: bucket,
178 value: agg.request_count as f64,
179 };
180
181 series_map.entry(agg.protocol.clone()).or_default().push(point);
182 }
183
184 let mut result: Vec<TimeSeries> = series_map
186 .into_iter()
187 .map(|(protocol, mut points)| {
188 points.sort_by_key(|p| p.timestamp);
189
190 let mut aggregated = Vec::new();
192 let mut current_bucket = None;
193 let mut current_sum = 0.0;
194
195 for point in points {
196 match current_bucket {
197 Some(bucket) if bucket == point.timestamp => {
198 current_sum += point.value;
199 }
200 _ => {
201 if let Some(bucket) = current_bucket {
202 aggregated.push(TimeSeriesPoint {
203 timestamp: bucket,
204 value: current_sum,
205 });
206 }
207 current_bucket = Some(point.timestamp);
208 current_sum = point.value;
209 }
210 }
211 }
212
213 if let Some(bucket) = current_bucket {
214 aggregated.push(TimeSeriesPoint {
215 timestamp: bucket,
216 value: current_sum,
217 });
218 }
219
220 TimeSeries {
221 label: protocol,
222 data: aggregated,
223 }
224 })
225 .collect();
226
227 result.sort_by(|a, b| b.data.len().cmp(&a.data.len()));
228 Ok(result)
229 }
230
231 pub async fn get_latency_trends(&self, filter: &AnalyticsFilter) -> Result<Vec<LatencyTrend>> {
233 let aggregates = self.get_minute_aggregates(filter).await?;
234
235 let mut trends = Vec::new();
236
237 let mut bucket_map: std::collections::HashMap<i64, Vec<&MetricsAggregate>> =
239 std::collections::HashMap::new();
240
241 for agg in &aggregates {
242 bucket_map.entry(agg.timestamp).or_default().push(agg);
243 }
244
245 for (timestamp, group) in bucket_map {
246 let avg = group
247 .iter()
248 .filter_map(|a| {
249 if a.request_count > 0 {
250 Some(a.latency_sum / a.request_count as f64)
251 } else {
252 None
253 }
254 })
255 .sum::<f64>()
256 / group.len() as f64;
257
258 let min = group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
259 let max = group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
260 let p50 = group.iter().filter_map(|a| a.latency_p50).sum::<f64>() / group.len() as f64;
261 let p95 = group.iter().filter_map(|a| a.latency_p95).sum::<f64>() / group.len() as f64;
262 let p99 = group.iter().filter_map(|a| a.latency_p99).sum::<f64>() / group.len() as f64;
263
264 trends.push(LatencyTrend {
265 timestamp,
266 p50,
267 p95,
268 p99,
269 avg,
270 min: if min.is_finite() { min } else { 0.0 },
271 max: if max.is_finite() { max } else { 0.0 },
272 });
273 }
274
275 trends.sort_by_key(|t| t.timestamp);
276 Ok(trends)
277 }
278
279 pub async fn get_error_summary(
281 &self,
282 filter: &AnalyticsFilter,
283 limit: i64,
284 ) -> Result<Vec<ErrorSummary>> {
285 let errors = self.get_recent_errors(1000, filter).await?;
286
287 let mut error_map: std::collections::HashMap<
289 String,
290 (i64, std::collections::HashSet<String>, i64),
291 > = std::collections::HashMap::new();
292
293 for error in errors {
294 let error_type = error.error_type.clone().unwrap_or_else(|| "unknown".to_string());
295 let error_category =
296 error.error_category.clone().unwrap_or_else(|| "other".to_string());
297 let endpoint = error.endpoint.clone().unwrap_or_default();
298
299 let entry = error_map.entry(format!("{}:{}", error_category, error_type)).or_insert((
300 0,
301 std::collections::HashSet::new(),
302 0,
303 ));
304
305 entry.0 += 1;
306 entry.1.insert(endpoint);
307 entry.2 = entry.2.max(error.timestamp);
308 }
309
310 let mut summaries: Vec<ErrorSummary> = error_map
311 .into_iter()
312 .map(|(key, (count, endpoints, last_ts))| {
313 let parts: Vec<&str> = key.split(':').collect();
314 ErrorSummary {
315 error_type: parts.get(1).unwrap_or(&"unknown").to_string(),
316 error_category: parts.first().unwrap_or(&"other").to_string(),
317 count,
318 endpoints: endpoints.into_iter().collect(),
319 last_occurrence: DateTime::from_timestamp(last_ts, 0)
320 .unwrap_or_else(|| Utc::now()),
321 }
322 })
323 .collect();
324
325 summaries.sort_by(|a, b| b.count.cmp(&a.count));
326 summaries.truncate(limit as usize);
327
328 Ok(summaries)
329 }
330}