Skip to main content

tuitbot_core/storage/
mcp_telemetry.rs

1//! MCP execution telemetry storage.
2//!
3//! Records tool invocations with latency, success/failure, error codes,
4//! and policy decisions. Provides windowed aggregation queries for
5//! observability MCP tools.
6
7use super::accounts::DEFAULT_ACCOUNT_ID;
8use super::DbPool;
9use crate::error::StorageError;
10use serde::Serialize;
11use std::collections::HashMap;
12
13/// A single telemetry record.
14#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
15pub struct TelemetryEntry {
16    pub id: i64,
17    pub tool_name: String,
18    pub category: String,
19    pub latency_ms: i64,
20    pub success: bool,
21    pub error_code: Option<String>,
22    pub policy_decision: Option<String>,
23    pub metadata: Option<String>,
24    pub created_at: String,
25}
26
27/// Parameters for inserting a telemetry entry.
28pub struct TelemetryParams<'a> {
29    pub tool_name: &'a str,
30    pub category: &'a str,
31    pub latency_ms: u64,
32    pub success: bool,
33    pub error_code: Option<&'a str>,
34    pub policy_decision: Option<&'a str>,
35    pub metadata: Option<&'a str>,
36}
37
38/// Insert a telemetry entry for a specific account.
39pub async fn log_telemetry_for(
40    pool: &DbPool,
41    account_id: &str,
42    params: &TelemetryParams<'_>,
43) -> Result<(), StorageError> {
44    sqlx::query(
45        "INSERT INTO mcp_telemetry \
46         (account_id, tool_name, category, latency_ms, success, error_code, policy_decision, metadata) \
47         VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
48    )
49    .bind(account_id)
50    .bind(params.tool_name)
51    .bind(params.category)
52    .bind(params.latency_ms as i64)
53    .bind(params.success)
54    .bind(params.error_code)
55    .bind(params.policy_decision)
56    .bind(params.metadata)
57    .execute(pool)
58    .await
59    .map_err(|e| StorageError::Query { source: e })?;
60    Ok(())
61}
62
63/// Insert a telemetry entry.
64pub async fn log_telemetry(
65    pool: &DbPool,
66    params: &TelemetryParams<'_>,
67) -> Result<(), StorageError> {
68    log_telemetry_for(pool, DEFAULT_ACCOUNT_ID, params).await
69}
70
71/// Aggregated metrics for a single tool.
72#[derive(Debug, Clone, Serialize)]
73pub struct ToolMetrics {
74    pub tool_name: String,
75    pub category: String,
76    pub total_calls: i64,
77    pub success_count: i64,
78    pub failure_count: i64,
79    pub success_rate: f64,
80    pub avg_latency_ms: f64,
81    pub p50_latency_ms: f64,
82    pub p95_latency_ms: f64,
83    pub min_latency_ms: i64,
84    pub max_latency_ms: i64,
85}
86
87/// Raw aggregate row from the metrics query.
88#[derive(sqlx::FromRow)]
89struct MetricsAggRow {
90    tool_name: String,
91    category: String,
92    total: i64,
93    successes: i64,
94    failures: i64,
95    avg_lat: f64,
96    min_lat: i64,
97    max_lat: i64,
98}
99
100/// Get aggregated metrics per tool in a time window for a specific account.
101pub async fn get_metrics_since_for(
102    pool: &DbPool,
103    account_id: &str,
104    since: &str,
105) -> Result<Vec<ToolMetrics>, StorageError> {
106    // First get basic aggregates per tool
107    let rows: Vec<MetricsAggRow> = sqlx::query_as(
108        "SELECT tool_name, category, \
109         COUNT(*) as total, \
110         SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successes, \
111         SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as failures, \
112         AVG(latency_ms) as avg_lat, \
113         MIN(latency_ms) as min_lat, \
114         MAX(latency_ms) as max_lat \
115         FROM mcp_telemetry WHERE created_at >= ? AND account_id = ? \
116         GROUP BY tool_name, category ORDER BY total DESC",
117    )
118    .bind(since)
119    .bind(account_id)
120    .fetch_all(pool)
121    .await
122    .map_err(|e| StorageError::Query { source: e })?;
123
124    let mut results = Vec::with_capacity(rows.len());
125    for row in rows {
126        let MetricsAggRow {
127            tool_name,
128            category,
129            total,
130            successes,
131            failures,
132            avg_lat,
133            min_lat,
134            max_lat,
135        } = row;
136        // Compute percentiles by fetching sorted latencies for this tool
137        let latencies: Vec<(i64,)> = sqlx::query_as(
138            "SELECT latency_ms FROM mcp_telemetry \
139             WHERE created_at >= ? AND tool_name = ? AND account_id = ? ORDER BY latency_ms ASC",
140        )
141        .bind(since)
142        .bind(&tool_name)
143        .bind(account_id)
144        .fetch_all(pool)
145        .await
146        .map_err(|e| StorageError::Query { source: e })?;
147
148        let p50 = percentile(&latencies, 50);
149        let p95 = percentile(&latencies, 95);
150        let success_rate = if total > 0 {
151            successes as f64 / total as f64
152        } else {
153            0.0
154        };
155
156        results.push(ToolMetrics {
157            tool_name,
158            category,
159            total_calls: total,
160            success_count: successes,
161            failure_count: failures,
162            success_rate,
163            avg_latency_ms: avg_lat,
164            p50_latency_ms: p50 as f64,
165            p95_latency_ms: p95 as f64,
166            min_latency_ms: min_lat,
167            max_latency_ms: max_lat,
168        });
169    }
170
171    Ok(results)
172}
173
174/// Get aggregated metrics per tool in a time window.
175pub async fn get_metrics_since(
176    pool: &DbPool,
177    since: &str,
178) -> Result<Vec<ToolMetrics>, StorageError> {
179    get_metrics_since_for(pool, DEFAULT_ACCOUNT_ID, since).await
180}
181
182/// Error breakdown: error_code → count, grouped by tool.
183#[derive(Debug, Clone, Serialize)]
184pub struct ErrorBreakdown {
185    pub tool_name: String,
186    pub error_code: String,
187    pub count: i64,
188    pub latest_at: String,
189}
190
191/// Get error distribution since a timestamp for a specific account.
192pub async fn get_error_breakdown_for(
193    pool: &DbPool,
194    account_id: &str,
195    since: &str,
196) -> Result<Vec<ErrorBreakdown>, StorageError> {
197    let rows: Vec<(String, String, i64, String)> = sqlx::query_as(
198        "SELECT tool_name, COALESCE(error_code, 'unknown') as err, \
199         COUNT(*) as cnt, MAX(created_at) as latest \
200         FROM mcp_telemetry \
201         WHERE created_at >= ? AND success = 0 AND account_id = ? \
202         GROUP BY tool_name, error_code \
203         ORDER BY cnt DESC",
204    )
205    .bind(since)
206    .bind(account_id)
207    .fetch_all(pool)
208    .await
209    .map_err(|e| StorageError::Query { source: e })?;
210
211    Ok(rows
212        .into_iter()
213        .map(|(tool_name, error_code, count, latest_at)| ErrorBreakdown {
214            tool_name,
215            error_code,
216            count,
217            latest_at,
218        })
219        .collect())
220}
221
222/// Get error distribution since a timestamp.
223pub async fn get_error_breakdown(
224    pool: &DbPool,
225    since: &str,
226) -> Result<Vec<ErrorBreakdown>, StorageError> {
227    get_error_breakdown_for(pool, DEFAULT_ACCOUNT_ID, since).await
228}
229
230/// Summary statistics across all tools.
231#[derive(Debug, Clone, Serialize)]
232pub struct TelemetrySummary {
233    pub total_calls: i64,
234    pub total_successes: i64,
235    pub total_failures: i64,
236    pub overall_success_rate: f64,
237    pub avg_latency_ms: f64,
238    pub unique_tools: i64,
239    pub policy_decisions: HashMap<String, i64>,
240}
241
242/// Get summary statistics since a timestamp for a specific account.
243pub async fn get_summary_for(
244    pool: &DbPool,
245    account_id: &str,
246    since: &str,
247) -> Result<TelemetrySummary, StorageError> {
248    let (total, successes, failures, avg_lat): (i64, i64, i64, f64) = sqlx::query_as(
249        "SELECT COUNT(*), \
250         SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END), \
251         SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END), \
252         COALESCE(AVG(latency_ms), 0.0) \
253         FROM mcp_telemetry WHERE created_at >= ? AND account_id = ?",
254    )
255    .bind(since)
256    .bind(account_id)
257    .fetch_one(pool)
258    .await
259    .map_err(|e| StorageError::Query { source: e })?;
260
261    let (unique_tools,): (i64,) = sqlx::query_as(
262        "SELECT COUNT(DISTINCT tool_name) FROM mcp_telemetry WHERE created_at >= ? AND account_id = ?",
263    )
264    .bind(since)
265    .bind(account_id)
266    .fetch_one(pool)
267    .await
268    .map_err(|e| StorageError::Query { source: e })?;
269
270    let policy_rows: Vec<(String, i64)> = sqlx::query_as(
271        "SELECT COALESCE(policy_decision, 'none') as pd, COUNT(*) \
272         FROM mcp_telemetry WHERE created_at >= ? AND account_id = ? \
273         GROUP BY policy_decision",
274    )
275    .bind(since)
276    .bind(account_id)
277    .fetch_all(pool)
278    .await
279    .map_err(|e| StorageError::Query { source: e })?;
280
281    let overall_success_rate = if total > 0 {
282        successes as f64 / total as f64
283    } else {
284        0.0
285    };
286
287    Ok(TelemetrySummary {
288        total_calls: total,
289        total_successes: successes,
290        total_failures: failures,
291        overall_success_rate,
292        avg_latency_ms: avg_lat,
293        unique_tools,
294        policy_decisions: policy_rows.into_iter().collect(),
295    })
296}
297
298/// Get summary statistics since a timestamp.
299pub async fn get_summary(pool: &DbPool, since: &str) -> Result<TelemetrySummary, StorageError> {
300    get_summary_for(pool, DEFAULT_ACCOUNT_ID, since).await
301}
302
303/// Get recent telemetry entries for a specific account, ordered newest-first.
304pub async fn get_recent_entries_for(
305    pool: &DbPool,
306    account_id: &str,
307    limit: u32,
308) -> Result<Vec<TelemetryEntry>, StorageError> {
309    sqlx::query_as::<_, TelemetryEntry>(
310        "SELECT id, tool_name, category, latency_ms, success, \
311         error_code, policy_decision, metadata, created_at \
312         FROM mcp_telemetry WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
313    )
314    .bind(account_id)
315    .bind(limit)
316    .fetch_all(pool)
317    .await
318    .map_err(|e| StorageError::Query { source: e })
319}
320
321/// Get recent telemetry entries, ordered newest-first.
322pub async fn get_recent_entries(
323    pool: &DbPool,
324    limit: u32,
325) -> Result<Vec<TelemetryEntry>, StorageError> {
326    get_recent_entries_for(pool, DEFAULT_ACCOUNT_ID, limit).await
327}
328
329/// Compute a percentile from sorted latency values.
330fn percentile(sorted: &[(i64,)], pct: u32) -> i64 {
331    if sorted.is_empty() {
332        return 0;
333    }
334    let idx = ((pct as f64 / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize;
335    let idx = idx.min(sorted.len() - 1);
336    sorted[idx].0
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::storage::init_test_db;
343
344    async fn log(
345        pool: &DbPool,
346        tool: &str,
347        cat: &str,
348        ms: u64,
349        ok: bool,
350        err: Option<&str>,
351        policy: Option<&str>,
352    ) {
353        log_telemetry(
354            pool,
355            &TelemetryParams {
356                tool_name: tool,
357                category: cat,
358                latency_ms: ms,
359                success: ok,
360                error_code: err,
361                policy_decision: policy,
362                metadata: None,
363            },
364        )
365        .await
366        .expect("log telemetry");
367    }
368
369    #[tokio::test]
370    async fn log_and_retrieve_telemetry() {
371        let pool = init_test_db().await.expect("init db");
372
373        log(&pool, "get_stats", "analytics", 42, true, None, None).await;
374
375        let metrics = get_metrics_since(&pool, "2000-01-01T00:00:00Z")
376            .await
377            .expect("metrics");
378        assert_eq!(metrics.len(), 1);
379        assert_eq!(metrics[0].tool_name, "get_stats");
380        assert_eq!(metrics[0].total_calls, 1);
381        assert_eq!(metrics[0].success_count, 1);
382        assert_eq!(metrics[0].failure_count, 0);
383    }
384
385    #[tokio::test]
386    async fn error_breakdown_groups_by_code() {
387        let pool = init_test_db().await.expect("init db");
388
389        log(
390            &pool,
391            "compose_tweet",
392            "mutation",
393            100,
394            false,
395            Some("policy_denied_blocked"),
396            Some("deny"),
397        )
398        .await;
399        log(
400            &pool,
401            "compose_tweet",
402            "mutation",
403            50,
404            false,
405            Some("policy_denied_blocked"),
406            Some("deny"),
407        )
408        .await;
409        log(
410            &pool,
411            "compose_tweet",
412            "mutation",
413            80,
414            false,
415            Some("db_error"),
416            None,
417        )
418        .await;
419
420        let errors = get_error_breakdown(&pool, "2000-01-01T00:00:00Z")
421            .await
422            .expect("errors");
423        assert_eq!(errors.len(), 2);
424        // Sorted by count desc
425        assert_eq!(errors[0].error_code, "policy_denied_blocked");
426        assert_eq!(errors[0].count, 2);
427        assert_eq!(errors[1].error_code, "db_error");
428        assert_eq!(errors[1].count, 1);
429    }
430
431    #[tokio::test]
432    async fn summary_aggregates_correctly() {
433        let pool = init_test_db().await.expect("init db");
434
435        log(&pool, "get_stats", "analytics", 10, true, None, None).await;
436        log(&pool, "get_stats", "analytics", 20, true, None, None).await;
437        log(
438            &pool,
439            "compose_tweet",
440            "mutation",
441            50,
442            false,
443            Some("err"),
444            Some("deny"),
445        )
446        .await;
447
448        let summary = get_summary(&pool, "2000-01-01T00:00:00Z")
449            .await
450            .expect("summary");
451        assert_eq!(summary.total_calls, 3);
452        assert_eq!(summary.total_successes, 2);
453        assert_eq!(summary.total_failures, 1);
454        assert_eq!(summary.unique_tools, 2);
455    }
456
457    #[tokio::test]
458    async fn empty_telemetry_returns_empty() {
459        let pool = init_test_db().await.expect("init db");
460
461        let metrics = get_metrics_since(&pool, "2000-01-01T00:00:00Z")
462            .await
463            .expect("metrics");
464        assert!(metrics.is_empty());
465
466        let errors = get_error_breakdown(&pool, "2000-01-01T00:00:00Z")
467            .await
468            .expect("errors");
469        assert!(errors.is_empty());
470
471        let summary = get_summary(&pool, "2000-01-01T00:00:00Z")
472            .await
473            .expect("summary");
474        assert_eq!(summary.total_calls, 0);
475        assert_eq!(summary.overall_success_rate, 0.0);
476    }
477
478    #[tokio::test]
479    async fn percentile_calculation() {
480        let pool = init_test_db().await.expect("init db");
481
482        for ms in [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] {
483            log(&pool, "test_tool", "test", ms, true, None, None).await;
484        }
485
486        let metrics = get_metrics_since(&pool, "2000-01-01T00:00:00Z")
487            .await
488            .expect("metrics");
489        assert_eq!(metrics.len(), 1);
490        assert_eq!(metrics[0].total_calls, 10);
491        assert_eq!(metrics[0].min_latency_ms, 10);
492        assert_eq!(metrics[0].max_latency_ms, 100);
493        // p50 ~ 50-60, p95 ~ 90-100
494        assert!(metrics[0].p50_latency_ms >= 50.0);
495        assert!(metrics[0].p95_latency_ms >= 90.0);
496    }
497}