Skip to main content

mockforge_registry_server/handlers/
observability.rs

1//! Observability saved-queries + dashboards handlers
2//! (cloud-enablement task #2 / Phase 1).
3//!
4//! Cross-deployment query handlers themselves live in a follow-up
5//! slice — this file owns the persistence layer for users' named
6//! filters and dashboard layouts, which is enough for the UI to render
7//! the "saved searches" sidebar and the dashboard list.
8//!
9//! Routes:
10//!   GET    /api/v1/organizations/{org_id}/observability/saved-queries[?kind=]
11//!   POST   /api/v1/organizations/{org_id}/observability/saved-queries
12//!   PATCH  /api/v1/observability/saved-queries/{id}
13//!   DELETE /api/v1/observability/saved-queries/{id}
14//!
15//!   GET    /api/v1/organizations/{org_id}/observability/dashboards
16//!   POST   /api/v1/organizations/{org_id}/observability/dashboards
17//!   PATCH  /api/v1/observability/dashboards/{id}
18//!   DELETE /api/v1/observability/dashboards/{id}
19
20use axum::{
21    extract::{Path, Query, State},
22    http::HeaderMap,
23    Json,
24};
25use mockforge_registry_core::models::observability_query::{CreateDashboard, CreateSavedQuery};
26use serde::Deserialize;
27use uuid::Uuid;
28
29use crate::{
30    error::{ApiError, ApiResult},
31    middleware::{resolve_org_context, AuthUser},
32    models::{ObservabilityDashboard, ObservabilitySavedQuery},
33    AppState,
34};
35
36#[derive(Debug, Deserialize)]
37pub struct ListQueriesQuery {
38    #[serde(default)]
39    pub kind: Option<String>,
40}
41
42/// `GET /api/v1/organizations/{org_id}/observability/saved-queries`
43pub async fn list_saved_queries(
44    State(state): State<AppState>,
45    AuthUser(user_id): AuthUser,
46    Path(org_id): Path<Uuid>,
47    Query(query): Query<ListQueriesQuery>,
48    headers: HeaderMap,
49) -> ApiResult<Json<Vec<ObservabilitySavedQuery>>> {
50    authorize_org(&state, user_id, &headers, org_id).await?;
51    let rows = ObservabilitySavedQuery::list_by_org(state.db.pool(), org_id, query.kind.as_deref())
52        .await
53        .map_err(ApiError::Database)?;
54    Ok(Json(rows))
55}
56
57#[derive(Debug, Deserialize)]
58pub struct CreateSavedQueryRequest {
59    pub name: String,
60    #[serde(default)]
61    pub description: Option<String>,
62    pub kind: String,
63    pub filters: serde_json::Value,
64    #[serde(default)]
65    pub workspace_id: Option<Uuid>,
66}
67
68/// `POST /api/v1/organizations/{org_id}/observability/saved-queries`
69pub async fn create_saved_query(
70    State(state): State<AppState>,
71    AuthUser(user_id): AuthUser,
72    Path(org_id): Path<Uuid>,
73    headers: HeaderMap,
74    Json(request): Json<CreateSavedQueryRequest>,
75) -> ApiResult<Json<ObservabilitySavedQuery>> {
76    authorize_org(&state, user_id, &headers, org_id).await?;
77    if request.name.trim().is_empty() {
78        return Err(ApiError::InvalidRequest("name must not be empty".into()));
79    }
80    if !ObservabilitySavedQuery::is_valid_kind(&request.kind) {
81        return Err(ApiError::InvalidRequest(format!(
82            "kind must be one of: {}",
83            ObservabilitySavedQuery::VALID_KINDS.join(", ")
84        )));
85    }
86
87    let row = ObservabilitySavedQuery::create(
88        state.db.pool(),
89        CreateSavedQuery {
90            org_id,
91            workspace_id: request.workspace_id,
92            name: &request.name,
93            description: request.description.as_deref(),
94            kind: &request.kind,
95            filters: &request.filters,
96            created_by: Some(user_id),
97        },
98    )
99    .await
100    .map_err(ApiError::Database)?;
101    Ok(Json(row))
102}
103
104#[derive(Debug, Deserialize)]
105pub struct UpdateSavedQueryRequest {
106    #[serde(default)]
107    pub name: Option<String>,
108    #[serde(default)]
109    pub filters: Option<serde_json::Value>,
110}
111
112/// `PATCH /api/v1/observability/saved-queries/{id}`
113pub async fn update_saved_query(
114    State(state): State<AppState>,
115    AuthUser(user_id): AuthUser,
116    Path(id): Path<Uuid>,
117    headers: HeaderMap,
118    Json(request): Json<UpdateSavedQueryRequest>,
119) -> ApiResult<Json<ObservabilitySavedQuery>> {
120    let existing = load_authorized_query(&state, user_id, &headers, id).await?;
121    let _ = existing;
122    let updated = ObservabilitySavedQuery::update(
123        state.db.pool(),
124        id,
125        request.name.as_deref(),
126        request.filters.as_ref(),
127    )
128    .await
129    .map_err(ApiError::Database)?
130    .ok_or_else(|| ApiError::InvalidRequest("Saved query not found".into()))?;
131    Ok(Json(updated))
132}
133
134/// `DELETE /api/v1/observability/saved-queries/{id}`
135pub async fn delete_saved_query(
136    State(state): State<AppState>,
137    AuthUser(user_id): AuthUser,
138    Path(id): Path<Uuid>,
139    headers: HeaderMap,
140) -> ApiResult<Json<serde_json::Value>> {
141    load_authorized_query(&state, user_id, &headers, id).await?;
142    let deleted = ObservabilitySavedQuery::delete(state.db.pool(), id)
143        .await
144        .map_err(ApiError::Database)?;
145    if !deleted {
146        return Err(ApiError::InvalidRequest("Saved query not found".into()));
147    }
148    Ok(Json(serde_json::json!({ "deleted": true })))
149}
150
151// --- dashboards ------------------------------------------------------------
152
153/// `GET /api/v1/organizations/{org_id}/observability/dashboards`
154pub async fn list_dashboards(
155    State(state): State<AppState>,
156    AuthUser(user_id): AuthUser,
157    Path(org_id): Path<Uuid>,
158    headers: HeaderMap,
159) -> ApiResult<Json<Vec<ObservabilityDashboard>>> {
160    authorize_org(&state, user_id, &headers, org_id).await?;
161    let rows = ObservabilityDashboard::list_by_org(state.db.pool(), org_id)
162        .await
163        .map_err(ApiError::Database)?;
164    Ok(Json(rows))
165}
166
167#[derive(Debug, Deserialize)]
168pub struct CreateDashboardRequest {
169    pub name: String,
170    #[serde(default)]
171    pub description: Option<String>,
172    pub layout: serde_json::Value,
173    pub queries: serde_json::Value,
174    #[serde(default)]
175    pub workspace_id: Option<Uuid>,
176}
177
178/// `POST /api/v1/organizations/{org_id}/observability/dashboards`
179pub async fn create_dashboard(
180    State(state): State<AppState>,
181    AuthUser(user_id): AuthUser,
182    Path(org_id): Path<Uuid>,
183    headers: HeaderMap,
184    Json(request): Json<CreateDashboardRequest>,
185) -> ApiResult<Json<ObservabilityDashboard>> {
186    authorize_org(&state, user_id, &headers, org_id).await?;
187    if request.name.trim().is_empty() {
188        return Err(ApiError::InvalidRequest("name must not be empty".into()));
189    }
190    let row = ObservabilityDashboard::create(
191        state.db.pool(),
192        CreateDashboard {
193            org_id,
194            workspace_id: request.workspace_id,
195            name: &request.name,
196            description: request.description.as_deref(),
197            layout: &request.layout,
198            queries: &request.queries,
199            created_by: Some(user_id),
200        },
201    )
202    .await
203    .map_err(ApiError::Database)?;
204    Ok(Json(row))
205}
206
207#[derive(Debug, Deserialize)]
208pub struct UpdateDashboardRequest {
209    #[serde(default)]
210    pub name: Option<String>,
211    #[serde(default)]
212    pub layout: Option<serde_json::Value>,
213    #[serde(default)]
214    pub queries: Option<serde_json::Value>,
215}
216
217/// `PATCH /api/v1/observability/dashboards/{id}`
218pub async fn update_dashboard(
219    State(state): State<AppState>,
220    AuthUser(user_id): AuthUser,
221    Path(id): Path<Uuid>,
222    headers: HeaderMap,
223    Json(request): Json<UpdateDashboardRequest>,
224) -> ApiResult<Json<ObservabilityDashboard>> {
225    load_authorized_dashboard(&state, user_id, &headers, id).await?;
226    let updated = ObservabilityDashboard::update(
227        state.db.pool(),
228        id,
229        request.name.as_deref(),
230        request.layout.as_ref(),
231        request.queries.as_ref(),
232    )
233    .await
234    .map_err(ApiError::Database)?
235    .ok_or_else(|| ApiError::InvalidRequest("Dashboard not found".into()))?;
236    Ok(Json(updated))
237}
238
239/// `DELETE /api/v1/observability/dashboards/{id}`
240pub async fn delete_dashboard(
241    State(state): State<AppState>,
242    AuthUser(user_id): AuthUser,
243    Path(id): Path<Uuid>,
244    headers: HeaderMap,
245) -> ApiResult<Json<serde_json::Value>> {
246    load_authorized_dashboard(&state, user_id, &headers, id).await?;
247    let deleted = ObservabilityDashboard::delete(state.db.pool(), id)
248        .await
249        .map_err(ApiError::Database)?;
250    if !deleted {
251        return Err(ApiError::InvalidRequest("Dashboard not found".into()));
252    }
253    Ok(Json(serde_json::json!({ "deleted": true })))
254}
255
256async fn authorize_org(
257    state: &AppState,
258    user_id: Uuid,
259    headers: &HeaderMap,
260    org_id: Uuid,
261) -> ApiResult<()> {
262    let ctx = resolve_org_context(state, user_id, headers, None)
263        .await
264        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
265    if ctx.org_id != org_id {
266        return Err(ApiError::InvalidRequest(
267            "Cannot access observability for a different org".into(),
268        ));
269    }
270    Ok(())
271}
272
273async fn load_authorized_query(
274    state: &AppState,
275    user_id: Uuid,
276    headers: &HeaderMap,
277    id: Uuid,
278) -> ApiResult<ObservabilitySavedQuery> {
279    let row = ObservabilitySavedQuery::find_by_id(state.db.pool(), id)
280        .await
281        .map_err(ApiError::Database)?
282        .ok_or_else(|| ApiError::InvalidRequest("Saved query not found".into()))?;
283    let ctx = resolve_org_context(state, user_id, headers, None)
284        .await
285        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
286    if ctx.org_id != row.org_id {
287        return Err(ApiError::InvalidRequest("Saved query not found".into()));
288    }
289    Ok(row)
290}
291
292async fn load_authorized_dashboard(
293    state: &AppState,
294    user_id: Uuid,
295    headers: &HeaderMap,
296    id: Uuid,
297) -> ApiResult<ObservabilityDashboard> {
298    let row = ObservabilityDashboard::find_by_id(state.db.pool(), id)
299        .await
300        .map_err(ApiError::Database)?
301        .ok_or_else(|| ApiError::InvalidRequest("Dashboard not found".into()))?;
302    let ctx = resolve_org_context(state, user_id, headers, None)
303        .await
304        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
305    if ctx.org_id != row.org_id {
306        return Err(ApiError::InvalidRequest("Dashboard not found".into()));
307    }
308    Ok(row)
309}
310
311// --- cross-deployment trace query ------------------------------------------
312
313/// One trace span row. Mirrors the runtime_traces table minus the
314/// internal `id` / `received_at` plumbing the UI doesn't need.
315#[derive(Debug, serde::Serialize, sqlx::FromRow)]
316pub struct TraceSpanRow {
317    pub deployment_id: Uuid,
318    pub trace_id: String,
319    pub span_id: String,
320    pub parent_span_id: Option<String>,
321    pub service_name: Option<String>,
322    pub name: String,
323    pub kind: Option<i16>,
324    pub start_unix_nano: i64,
325    pub end_unix_nano: i64,
326    pub occurred_at: chrono::DateTime<chrono::Utc>,
327    pub status_code: Option<i16>,
328    pub status_message: Option<String>,
329    pub attributes: serde_json::Value,
330}
331
332#[derive(Debug, Deserialize)]
333pub struct TraceQueryRequest {
334    /// Restrict to one deployment. None = all deployments in the org.
335    #[serde(default)]
336    pub deployment_id: Option<Uuid>,
337    /// Filter by service_name (exact match — the OTel resource attr).
338    #[serde(default)]
339    pub service_name: Option<String>,
340    /// Free-text name filter (LIKE %query%).
341    #[serde(default)]
342    pub name_contains: Option<String>,
343    /// Status filter: "ok" | "error" | "any" (default).
344    #[serde(default)]
345    pub status: Option<String>,
346    /// ISO-8601; defaults to 1h ago.
347    #[serde(default)]
348    pub since: Option<chrono::DateTime<chrono::Utc>>,
349    /// ISO-8601; defaults to now.
350    #[serde(default)]
351    pub until: Option<chrono::DateTime<chrono::Utc>>,
352    /// Page size, capped at 500.
353    #[serde(default)]
354    pub limit: Option<i64>,
355}
356
357/// `POST /api/v1/organizations/{org_id}/observability/traces/query`
358///
359/// Cross-deployment trace search scoped to the caller's org. Joins
360/// `runtime_traces` against `hosted_mocks` so the org_id check is one
361/// authoritative WHERE clause, not a per-row filter the caller could
362/// fool. Runs as POST (not GET) because the filter set is too wide for
363/// a sane query string.
364pub async fn query_traces(
365    State(state): State<AppState>,
366    AuthUser(user_id): AuthUser,
367    Path(org_id): Path<Uuid>,
368    headers: HeaderMap,
369    Json(req): Json<TraceQueryRequest>,
370) -> ApiResult<Json<Vec<TraceSpanRow>>> {
371    authorize_org(&state, user_id, &headers, org_id).await?;
372
373    let limit = req.limit.unwrap_or(200).clamp(1, 500);
374    let until = req.until.unwrap_or_else(chrono::Utc::now);
375    let since = req.since.unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::hours(1));
376    if until < since {
377        return Err(ApiError::InvalidRequest("until must be >= since".into()));
378    }
379
380    let status_filter: Option<i16> = match req.status.as_deref() {
381        Some("ok") => Some(1),
382        Some("error") => Some(2),
383        Some("any") | None => None,
384        Some(other) => {
385            return Err(ApiError::InvalidRequest(format!(
386                "status must be 'ok', 'error', or 'any' — got '{other}'"
387            )));
388        }
389    };
390
391    let name_pattern: Option<String> = req
392        .name_contains
393        .as_ref()
394        .map(|s| format!("%{}%", s.replace('%', r"\%").replace('_', r"\_")));
395
396    let rows = sqlx::query_as::<_, TraceSpanRow>(
397        r#"
398        SELECT t.deployment_id, t.trace_id, t.span_id, t.parent_span_id,
399               t.service_name, t.name, t.kind,
400               t.start_unix_nano, t.end_unix_nano, t.occurred_at,
401               t.status_code, t.status_message, t.attributes
402          FROM runtime_traces t
403          JOIN hosted_mocks d ON d.id = t.deployment_id
404         WHERE d.org_id = $1
405           AND t.occurred_at >= $2
406           AND t.occurred_at <= $3
407           AND ($4::uuid IS NULL OR t.deployment_id = $4)
408           AND ($5::text IS NULL OR t.service_name = $5)
409           AND ($6::text IS NULL OR t.name ILIKE $6)
410           AND ($7::int2 IS NULL OR t.status_code = $7)
411         ORDER BY t.occurred_at DESC
412         LIMIT $8
413        "#,
414    )
415    .bind(org_id)
416    .bind(since)
417    .bind(until)
418    .bind(req.deployment_id)
419    .bind(req.service_name)
420    .bind(name_pattern)
421    .bind(status_filter)
422    .bind(limit)
423    .fetch_all(state.db.pool())
424    .await
425    .map_err(ApiError::Database)?;
426
427    Ok(Json(rows))
428}
429
430// --- saved-query execution (#465) ------------------------------------------
431
432/// One result bucket. `label` is the group key for grouped queries
433/// (e.g. status code "200", "404") or `"all"` for ungrouped totals.
434#[derive(Debug, serde::Serialize)]
435pub struct QueryBucket {
436    pub label: String,
437    pub count: i64,
438}
439
440#[derive(Debug, serde::Serialize)]
441pub struct ExecuteQueryResponse {
442    pub metric: String,
443    pub total: i64,
444    pub window_minutes: i64,
445    pub series: Vec<QueryBucket>,
446}
447
448#[derive(Debug, Deserialize, Default)]
449pub struct ExecuteQueryRequest {
450    /// Override the saved query's workspace scope. None falls back to
451    /// `SavedQuery.workspace_id`. None on both means org-wide.
452    #[serde(default)]
453    pub workspace_id: Option<Uuid>,
454    /// Override the window encoded in `filters.window_minutes` for ad-hoc
455    /// time-range tweaks from the UI.
456    #[serde(default)]
457    pub window_minutes: Option<i64>,
458}
459
460/// `POST /api/v1/observability/saved-queries/{id}/execute`
461///
462/// Runs the saved query's `filters` payload against runtime data and
463/// returns a flat `{ metric, total, window_minutes, series }` shape the
464/// UI's tile components consume directly. Phase 1 supports three
465/// `filters.kind` shapes — `request_count`, `request_count_by_status`,
466/// `incident_count` — each with an optional `window_minutes`.
467pub async fn execute_saved_query(
468    State(state): State<AppState>,
469    AuthUser(user_id): AuthUser,
470    Path(id): Path<Uuid>,
471    headers: HeaderMap,
472    body: Option<Json<ExecuteQueryRequest>>,
473) -> ApiResult<Json<ExecuteQueryResponse>> {
474    let req = body.map(|Json(b)| b).unwrap_or_default();
475    let query = load_authorized_query(&state, user_id, &headers, id).await?;
476
477    let kind = query
478        .filters
479        .get("kind")
480        .and_then(|v| v.as_str())
481        .ok_or_else(|| {
482            ApiError::InvalidRequest("Saved query filters missing required `kind`".into())
483        })?
484        .to_string();
485    let window_minutes = req
486        .window_minutes
487        .or_else(|| query.filters.get("window_minutes").and_then(|v| v.as_i64()))
488        .unwrap_or(15)
489        .clamp(1, 24 * 60);
490    let workspace_filter = req.workspace_id.or(query.workspace_id);
491
492    let pool = state.db.pool();
493    let response = match kind.as_str() {
494        "request_count" => run_request_count(pool, query.org_id, workspace_filter, window_minutes)
495            .await
496            .map_err(ApiError::Database)?,
497        "request_count_by_status" => {
498            run_request_count_by_status(pool, query.org_id, workspace_filter, window_minutes)
499                .await
500                .map_err(ApiError::Database)?
501        }
502        "incident_count" => {
503            run_incident_count(pool, query.org_id, workspace_filter, window_minutes)
504                .await
505                .map_err(ApiError::Database)?
506        }
507        other => {
508            return Err(ApiError::InvalidRequest(format!(
509                "Unsupported saved-query kind '{other}'. Supported: request_count, request_count_by_status, incident_count"
510            )));
511        }
512    };
513
514    Ok(Json(response))
515}
516
517/// Total request count over the window. Workspace-scoped when
518/// `workspace_id` is set; otherwise the `runtime_captures.workspace_id`
519/// filter is dropped (org-wide).
520///
521/// Note (#462): captures shipped from in-container hosted-mocks land
522/// with `workspace_id IS NULL` today, so workspace-scoped counts only
523/// reflect `--cloud-ship` traffic until the shipper backfill ships.
524async fn run_request_count(
525    pool: &sqlx::PgPool,
526    org_id: Uuid,
527    workspace_id: Option<Uuid>,
528    window_minutes: i64,
529) -> sqlx::Result<ExecuteQueryResponse> {
530    let total: i64 = if let Some(ws) = workspace_id {
531        sqlx::query_scalar(
532            r#"
533            SELECT COUNT(*)::BIGINT
534            FROM runtime_captures
535            WHERE workspace_id = $1
536              AND occurred_at >= NOW() - make_interval(mins => $2::int)
537            "#,
538        )
539        .bind(ws)
540        .bind(window_minutes as i32)
541        .fetch_one(pool)
542        .await?
543    } else {
544        sqlx::query_scalar(
545            r#"
546            SELECT COUNT(*)::BIGINT
547            FROM runtime_captures rc
548            JOIN hosted_mocks hm ON hm.id = rc.deployment_id
549            WHERE hm.org_id = $1
550              AND rc.occurred_at >= NOW() - make_interval(mins => $2::int)
551            "#,
552        )
553        .bind(org_id)
554        .bind(window_minutes as i32)
555        .fetch_one(pool)
556        .await?
557    };
558
559    Ok(ExecuteQueryResponse {
560        metric: "request_count".into(),
561        total,
562        window_minutes,
563        series: vec![QueryBucket {
564            label: "all".into(),
565            count: total,
566        }],
567    })
568}
569
570async fn run_request_count_by_status(
571    pool: &sqlx::PgPool,
572    org_id: Uuid,
573    workspace_id: Option<Uuid>,
574    window_minutes: i64,
575) -> sqlx::Result<ExecuteQueryResponse> {
576    // `COALESCE(response_status_code, status_code)` mirrors what the
577    // request-log endpoint surfaces — a "request that never got a
578    // response" still has a status_code on the request side.
579    let rows: Vec<(Option<i32>, i64)> = if let Some(ws) = workspace_id {
580        sqlx::query_as(
581            r#"
582            SELECT COALESCE(response_status_code, status_code) AS status,
583                   COUNT(*)::BIGINT
584            FROM runtime_captures
585            WHERE workspace_id = $1
586              AND occurred_at >= NOW() - make_interval(mins => $2::int)
587            GROUP BY status
588            ORDER BY status NULLS LAST
589            "#,
590        )
591        .bind(ws)
592        .bind(window_minutes as i32)
593        .fetch_all(pool)
594        .await?
595    } else {
596        sqlx::query_as(
597            r#"
598            SELECT COALESCE(rc.response_status_code, rc.status_code) AS status,
599                   COUNT(*)::BIGINT
600            FROM runtime_captures rc
601            JOIN hosted_mocks hm ON hm.id = rc.deployment_id
602            WHERE hm.org_id = $1
603              AND rc.occurred_at >= NOW() - make_interval(mins => $2::int)
604            GROUP BY status
605            ORDER BY status NULLS LAST
606            "#,
607        )
608        .bind(org_id)
609        .bind(window_minutes as i32)
610        .fetch_all(pool)
611        .await?
612    };
613
614    let total = rows.iter().map(|(_, c)| *c).sum();
615    let series = rows
616        .into_iter()
617        .map(|(status, count)| QueryBucket {
618            label: status.map(|s| s.to_string()).unwrap_or_else(|| "unknown".into()),
619            count,
620        })
621        .collect();
622
623    Ok(ExecuteQueryResponse {
624        metric: "request_count_by_status".into(),
625        total,
626        window_minutes,
627        series,
628    })
629}
630
631async fn run_incident_count(
632    pool: &sqlx::PgPool,
633    org_id: Uuid,
634    workspace_id: Option<Uuid>,
635    window_minutes: i64,
636) -> sqlx::Result<ExecuteQueryResponse> {
637    let rows: Vec<(String, i64)> = if let Some(ws) = workspace_id {
638        sqlx::query_as(
639            r#"
640            SELECT severity, COUNT(*)::BIGINT
641            FROM incidents
642            WHERE workspace_id = $1
643              AND created_at >= NOW() - make_interval(mins => $2::int)
644              AND status != 'resolved'
645            GROUP BY severity
646            ORDER BY CASE severity
647                       WHEN 'critical' THEN 0
648                       WHEN 'high' THEN 1
649                       WHEN 'medium' THEN 2
650                       WHEN 'low' THEN 3
651                       ELSE 4
652                     END
653            "#,
654        )
655        .bind(ws)
656        .bind(window_minutes as i32)
657        .fetch_all(pool)
658        .await?
659    } else {
660        sqlx::query_as(
661            r#"
662            SELECT severity, COUNT(*)::BIGINT
663            FROM incidents
664            WHERE org_id = $1
665              AND created_at >= NOW() - make_interval(mins => $2::int)
666              AND status != 'resolved'
667            GROUP BY severity
668            ORDER BY CASE severity
669                       WHEN 'critical' THEN 0
670                       WHEN 'high' THEN 1
671                       WHEN 'medium' THEN 2
672                       WHEN 'low' THEN 3
673                       ELSE 4
674                     END
675            "#,
676        )
677        .bind(org_id)
678        .bind(window_minutes as i32)
679        .fetch_all(pool)
680        .await?
681    };
682
683    let total = rows.iter().map(|(_, c)| *c).sum();
684    let series = rows
685        .into_iter()
686        .map(|(severity, count)| QueryBucket {
687            label: severity,
688            count,
689        })
690        .collect();
691
692    Ok(ExecuteQueryResponse {
693        metric: "incident_count".into(),
694        total,
695        window_minutes,
696        series,
697    })
698}