1use super::accounts::DEFAULT_ACCOUNT_ID;
8use super::DbPool;
9use crate::error::StorageError;
10use serde::Serialize;
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
15pub struct TelemetryEntry {
16 pub id: i64,
17 pub tool_name: String,
18 pub category: String,
19 pub latency_ms: i64,
20 pub success: bool,
21 pub error_code: Option<String>,
22 pub policy_decision: Option<String>,
23 pub metadata: Option<String>,
24 pub created_at: String,
25}
26
27pub struct TelemetryParams<'a> {
29 pub tool_name: &'a str,
30 pub category: &'a str,
31 pub latency_ms: u64,
32 pub success: bool,
33 pub error_code: Option<&'a str>,
34 pub policy_decision: Option<&'a str>,
35 pub metadata: Option<&'a str>,
36}
37
38pub async fn log_telemetry_for(
40 pool: &DbPool,
41 account_id: &str,
42 params: &TelemetryParams<'_>,
43) -> Result<(), StorageError> {
44 sqlx::query(
45 "INSERT INTO mcp_telemetry \
46 (account_id, tool_name, category, latency_ms, success, error_code, policy_decision, metadata) \
47 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
48 )
49 .bind(account_id)
50 .bind(params.tool_name)
51 .bind(params.category)
52 .bind(params.latency_ms as i64)
53 .bind(params.success)
54 .bind(params.error_code)
55 .bind(params.policy_decision)
56 .bind(params.metadata)
57 .execute(pool)
58 .await
59 .map_err(|e| StorageError::Query { source: e })?;
60 Ok(())
61}
62
63pub async fn log_telemetry(
65 pool: &DbPool,
66 params: &TelemetryParams<'_>,
67) -> Result<(), StorageError> {
68 log_telemetry_for(pool, DEFAULT_ACCOUNT_ID, params).await
69}
70
71#[derive(Debug, Clone, Serialize)]
73pub struct ToolMetrics {
74 pub tool_name: String,
75 pub category: String,
76 pub total_calls: i64,
77 pub success_count: i64,
78 pub failure_count: i64,
79 pub success_rate: f64,
80 pub avg_latency_ms: f64,
81 pub p50_latency_ms: f64,
82 pub p95_latency_ms: f64,
83 pub min_latency_ms: i64,
84 pub max_latency_ms: i64,
85}
86
87#[derive(sqlx::FromRow)]
89struct MetricsAggRow {
90 tool_name: String,
91 category: String,
92 total: i64,
93 successes: i64,
94 failures: i64,
95 avg_lat: f64,
96 min_lat: i64,
97 max_lat: i64,
98}
99
100pub async fn get_metrics_since_for(
102 pool: &DbPool,
103 account_id: &str,
104 since: &str,
105) -> Result<Vec<ToolMetrics>, StorageError> {
106 let rows: Vec<MetricsAggRow> = sqlx::query_as(
108 "SELECT tool_name, category, \
109 COUNT(*) as total, \
110 SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successes, \
111 SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as failures, \
112 AVG(latency_ms) as avg_lat, \
113 MIN(latency_ms) as min_lat, \
114 MAX(latency_ms) as max_lat \
115 FROM mcp_telemetry WHERE created_at >= ? AND account_id = ? \
116 GROUP BY tool_name, category ORDER BY total DESC",
117 )
118 .bind(since)
119 .bind(account_id)
120 .fetch_all(pool)
121 .await
122 .map_err(|e| StorageError::Query { source: e })?;
123
124 let mut results = Vec::with_capacity(rows.len());
125 for row in rows {
126 let MetricsAggRow {
127 tool_name,
128 category,
129 total,
130 successes,
131 failures,
132 avg_lat,
133 min_lat,
134 max_lat,
135 } = row;
136 let latencies: Vec<(i64,)> = sqlx::query_as(
138 "SELECT latency_ms FROM mcp_telemetry \
139 WHERE created_at >= ? AND tool_name = ? AND account_id = ? ORDER BY latency_ms ASC",
140 )
141 .bind(since)
142 .bind(&tool_name)
143 .bind(account_id)
144 .fetch_all(pool)
145 .await
146 .map_err(|e| StorageError::Query { source: e })?;
147
148 let p50 = percentile(&latencies, 50);
149 let p95 = percentile(&latencies, 95);
150 let success_rate = if total > 0 {
151 successes as f64 / total as f64
152 } else {
153 0.0
154 };
155
156 results.push(ToolMetrics {
157 tool_name,
158 category,
159 total_calls: total,
160 success_count: successes,
161 failure_count: failures,
162 success_rate,
163 avg_latency_ms: avg_lat,
164 p50_latency_ms: p50 as f64,
165 p95_latency_ms: p95 as f64,
166 min_latency_ms: min_lat,
167 max_latency_ms: max_lat,
168 });
169 }
170
171 Ok(results)
172}
173
174pub async fn get_metrics_since(
176 pool: &DbPool,
177 since: &str,
178) -> Result<Vec<ToolMetrics>, StorageError> {
179 get_metrics_since_for(pool, DEFAULT_ACCOUNT_ID, since).await
180}
181
182#[derive(Debug, Clone, Serialize)]
184pub struct ErrorBreakdown {
185 pub tool_name: String,
186 pub error_code: String,
187 pub count: i64,
188 pub latest_at: String,
189}
190
191pub async fn get_error_breakdown_for(
193 pool: &DbPool,
194 account_id: &str,
195 since: &str,
196) -> Result<Vec<ErrorBreakdown>, StorageError> {
197 let rows: Vec<(String, String, i64, String)> = sqlx::query_as(
198 "SELECT tool_name, COALESCE(error_code, 'unknown') as err, \
199 COUNT(*) as cnt, MAX(created_at) as latest \
200 FROM mcp_telemetry \
201 WHERE created_at >= ? AND success = 0 AND account_id = ? \
202 GROUP BY tool_name, error_code \
203 ORDER BY cnt DESC",
204 )
205 .bind(since)
206 .bind(account_id)
207 .fetch_all(pool)
208 .await
209 .map_err(|e| StorageError::Query { source: e })?;
210
211 Ok(rows
212 .into_iter()
213 .map(|(tool_name, error_code, count, latest_at)| ErrorBreakdown {
214 tool_name,
215 error_code,
216 count,
217 latest_at,
218 })
219 .collect())
220}
221
222pub async fn get_error_breakdown(
224 pool: &DbPool,
225 since: &str,
226) -> Result<Vec<ErrorBreakdown>, StorageError> {
227 get_error_breakdown_for(pool, DEFAULT_ACCOUNT_ID, since).await
228}
229
230#[derive(Debug, Clone, Serialize)]
232pub struct TelemetrySummary {
233 pub total_calls: i64,
234 pub total_successes: i64,
235 pub total_failures: i64,
236 pub overall_success_rate: f64,
237 pub avg_latency_ms: f64,
238 pub unique_tools: i64,
239 pub policy_decisions: HashMap<String, i64>,
240}
241
242pub async fn get_summary_for(
244 pool: &DbPool,
245 account_id: &str,
246 since: &str,
247) -> Result<TelemetrySummary, StorageError> {
248 let (total, successes, failures, avg_lat): (i64, i64, i64, f64) = sqlx::query_as(
249 "SELECT COUNT(*), \
250 SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END), \
251 SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END), \
252 COALESCE(AVG(latency_ms), 0.0) \
253 FROM mcp_telemetry WHERE created_at >= ? AND account_id = ?",
254 )
255 .bind(since)
256 .bind(account_id)
257 .fetch_one(pool)
258 .await
259 .map_err(|e| StorageError::Query { source: e })?;
260
261 let (unique_tools,): (i64,) = sqlx::query_as(
262 "SELECT COUNT(DISTINCT tool_name) FROM mcp_telemetry WHERE created_at >= ? AND account_id = ?",
263 )
264 .bind(since)
265 .bind(account_id)
266 .fetch_one(pool)
267 .await
268 .map_err(|e| StorageError::Query { source: e })?;
269
270 let policy_rows: Vec<(String, i64)> = sqlx::query_as(
271 "SELECT COALESCE(policy_decision, 'none') as pd, COUNT(*) \
272 FROM mcp_telemetry WHERE created_at >= ? AND account_id = ? \
273 GROUP BY policy_decision",
274 )
275 .bind(since)
276 .bind(account_id)
277 .fetch_all(pool)
278 .await
279 .map_err(|e| StorageError::Query { source: e })?;
280
281 let overall_success_rate = if total > 0 {
282 successes as f64 / total as f64
283 } else {
284 0.0
285 };
286
287 Ok(TelemetrySummary {
288 total_calls: total,
289 total_successes: successes,
290 total_failures: failures,
291 overall_success_rate,
292 avg_latency_ms: avg_lat,
293 unique_tools,
294 policy_decisions: policy_rows.into_iter().collect(),
295 })
296}
297
298pub async fn get_summary(pool: &DbPool, since: &str) -> Result<TelemetrySummary, StorageError> {
300 get_summary_for(pool, DEFAULT_ACCOUNT_ID, since).await
301}
302
303pub async fn get_recent_entries_for(
305 pool: &DbPool,
306 account_id: &str,
307 limit: u32,
308) -> Result<Vec<TelemetryEntry>, StorageError> {
309 sqlx::query_as::<_, TelemetryEntry>(
310 "SELECT id, tool_name, category, latency_ms, success, \
311 error_code, policy_decision, metadata, created_at \
312 FROM mcp_telemetry WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
313 )
314 .bind(account_id)
315 .bind(limit)
316 .fetch_all(pool)
317 .await
318 .map_err(|e| StorageError::Query { source: e })
319}
320
321pub async fn get_recent_entries(
323 pool: &DbPool,
324 limit: u32,
325) -> Result<Vec<TelemetryEntry>, StorageError> {
326 get_recent_entries_for(pool, DEFAULT_ACCOUNT_ID, limit).await
327}
328
329fn percentile(sorted: &[(i64,)], pct: u32) -> i64 {
331 if sorted.is_empty() {
332 return 0;
333 }
334 let idx = ((pct as f64 / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize;
335 let idx = idx.min(sorted.len() - 1);
336 sorted[idx].0
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::storage::init_test_db;
343
344 async fn log(
345 pool: &DbPool,
346 tool: &str,
347 cat: &str,
348 ms: u64,
349 ok: bool,
350 err: Option<&str>,
351 policy: Option<&str>,
352 ) {
353 log_telemetry(
354 pool,
355 &TelemetryParams {
356 tool_name: tool,
357 category: cat,
358 latency_ms: ms,
359 success: ok,
360 error_code: err,
361 policy_decision: policy,
362 metadata: None,
363 },
364 )
365 .await
366 .expect("log telemetry");
367 }
368
369 #[tokio::test]
370 async fn log_and_retrieve_telemetry() {
371 let pool = init_test_db().await.expect("init db");
372
373 log(&pool, "get_stats", "analytics", 42, true, None, None).await;
374
375 let metrics = get_metrics_since(&pool, "2000-01-01T00:00:00Z")
376 .await
377 .expect("metrics");
378 assert_eq!(metrics.len(), 1);
379 assert_eq!(metrics[0].tool_name, "get_stats");
380 assert_eq!(metrics[0].total_calls, 1);
381 assert_eq!(metrics[0].success_count, 1);
382 assert_eq!(metrics[0].failure_count, 0);
383 }
384
385 #[tokio::test]
386 async fn error_breakdown_groups_by_code() {
387 let pool = init_test_db().await.expect("init db");
388
389 log(
390 &pool,
391 "compose_tweet",
392 "mutation",
393 100,
394 false,
395 Some("policy_denied_blocked"),
396 Some("deny"),
397 )
398 .await;
399 log(
400 &pool,
401 "compose_tweet",
402 "mutation",
403 50,
404 false,
405 Some("policy_denied_blocked"),
406 Some("deny"),
407 )
408 .await;
409 log(
410 &pool,
411 "compose_tweet",
412 "mutation",
413 80,
414 false,
415 Some("db_error"),
416 None,
417 )
418 .await;
419
420 let errors = get_error_breakdown(&pool, "2000-01-01T00:00:00Z")
421 .await
422 .expect("errors");
423 assert_eq!(errors.len(), 2);
424 assert_eq!(errors[0].error_code, "policy_denied_blocked");
426 assert_eq!(errors[0].count, 2);
427 assert_eq!(errors[1].error_code, "db_error");
428 assert_eq!(errors[1].count, 1);
429 }
430
431 #[tokio::test]
432 async fn summary_aggregates_correctly() {
433 let pool = init_test_db().await.expect("init db");
434
435 log(&pool, "get_stats", "analytics", 10, true, None, None).await;
436 log(&pool, "get_stats", "analytics", 20, true, None, None).await;
437 log(
438 &pool,
439 "compose_tweet",
440 "mutation",
441 50,
442 false,
443 Some("err"),
444 Some("deny"),
445 )
446 .await;
447
448 let summary = get_summary(&pool, "2000-01-01T00:00:00Z")
449 .await
450 .expect("summary");
451 assert_eq!(summary.total_calls, 3);
452 assert_eq!(summary.total_successes, 2);
453 assert_eq!(summary.total_failures, 1);
454 assert_eq!(summary.unique_tools, 2);
455 }
456
457 #[tokio::test]
458 async fn empty_telemetry_returns_empty() {
459 let pool = init_test_db().await.expect("init db");
460
461 let metrics = get_metrics_since(&pool, "2000-01-01T00:00:00Z")
462 .await
463 .expect("metrics");
464 assert!(metrics.is_empty());
465
466 let errors = get_error_breakdown(&pool, "2000-01-01T00:00:00Z")
467 .await
468 .expect("errors");
469 assert!(errors.is_empty());
470
471 let summary = get_summary(&pool, "2000-01-01T00:00:00Z")
472 .await
473 .expect("summary");
474 assert_eq!(summary.total_calls, 0);
475 assert_eq!(summary.overall_success_rate, 0.0);
476 }
477
478 #[tokio::test]
479 async fn percentile_calculation() {
480 let pool = init_test_db().await.expect("init db");
481
482 for ms in [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] {
483 log(&pool, "test_tool", "test", ms, true, None, None).await;
484 }
485
486 let metrics = get_metrics_since(&pool, "2000-01-01T00:00:00Z")
487 .await
488 .expect("metrics");
489 assert_eq!(metrics.len(), 1);
490 assert_eq!(metrics[0].total_calls, 10);
491 assert_eq!(metrics[0].min_latency_ms, 10);
492 assert_eq!(metrics[0].max_latency_ms, 100);
493 assert!(metrics[0].p50_latency_ms >= 50.0);
495 assert!(metrics[0].p95_latency_ms >= 90.0);
496 }
497}