1use axum::{
21 extract::{Path, Query, State},
22 http::HeaderMap,
23 Json,
24};
25use mockforge_registry_core::models::observability_query::{CreateDashboard, CreateSavedQuery};
26use serde::Deserialize;
27use uuid::Uuid;
28
29use crate::{
30 error::{ApiError, ApiResult},
31 middleware::{resolve_org_context, AuthUser},
32 models::{ObservabilityDashboard, ObservabilitySavedQuery},
33 AppState,
34};
35
36#[derive(Debug, Deserialize)]
37pub struct ListQueriesQuery {
38 #[serde(default)]
39 pub kind: Option<String>,
40}
41
42pub async fn list_saved_queries(
44 State(state): State<AppState>,
45 AuthUser(user_id): AuthUser,
46 Path(org_id): Path<Uuid>,
47 Query(query): Query<ListQueriesQuery>,
48 headers: HeaderMap,
49) -> ApiResult<Json<Vec<ObservabilitySavedQuery>>> {
50 authorize_org(&state, user_id, &headers, org_id).await?;
51 let rows = ObservabilitySavedQuery::list_by_org(state.db.pool(), org_id, query.kind.as_deref())
52 .await
53 .map_err(ApiError::Database)?;
54 Ok(Json(rows))
55}
56
57#[derive(Debug, Deserialize)]
58pub struct CreateSavedQueryRequest {
59 pub name: String,
60 #[serde(default)]
61 pub description: Option<String>,
62 pub kind: String,
63 pub filters: serde_json::Value,
64 #[serde(default)]
65 pub workspace_id: Option<Uuid>,
66}
67
68pub async fn create_saved_query(
70 State(state): State<AppState>,
71 AuthUser(user_id): AuthUser,
72 Path(org_id): Path<Uuid>,
73 headers: HeaderMap,
74 Json(request): Json<CreateSavedQueryRequest>,
75) -> ApiResult<Json<ObservabilitySavedQuery>> {
76 authorize_org(&state, user_id, &headers, org_id).await?;
77 if request.name.trim().is_empty() {
78 return Err(ApiError::InvalidRequest("name must not be empty".into()));
79 }
80 if !ObservabilitySavedQuery::is_valid_kind(&request.kind) {
81 return Err(ApiError::InvalidRequest(format!(
82 "kind must be one of: {}",
83 ObservabilitySavedQuery::VALID_KINDS.join(", ")
84 )));
85 }
86
87 let row = ObservabilitySavedQuery::create(
88 state.db.pool(),
89 CreateSavedQuery {
90 org_id,
91 workspace_id: request.workspace_id,
92 name: &request.name,
93 description: request.description.as_deref(),
94 kind: &request.kind,
95 filters: &request.filters,
96 created_by: Some(user_id),
97 },
98 )
99 .await
100 .map_err(ApiError::Database)?;
101 Ok(Json(row))
102}
103
104#[derive(Debug, Deserialize)]
105pub struct UpdateSavedQueryRequest {
106 #[serde(default)]
107 pub name: Option<String>,
108 #[serde(default)]
109 pub filters: Option<serde_json::Value>,
110}
111
112pub async fn update_saved_query(
114 State(state): State<AppState>,
115 AuthUser(user_id): AuthUser,
116 Path(id): Path<Uuid>,
117 headers: HeaderMap,
118 Json(request): Json<UpdateSavedQueryRequest>,
119) -> ApiResult<Json<ObservabilitySavedQuery>> {
120 let existing = load_authorized_query(&state, user_id, &headers, id).await?;
121 let _ = existing;
122 let updated = ObservabilitySavedQuery::update(
123 state.db.pool(),
124 id,
125 request.name.as_deref(),
126 request.filters.as_ref(),
127 )
128 .await
129 .map_err(ApiError::Database)?
130 .ok_or_else(|| ApiError::InvalidRequest("Saved query not found".into()))?;
131 Ok(Json(updated))
132}
133
134pub async fn delete_saved_query(
136 State(state): State<AppState>,
137 AuthUser(user_id): AuthUser,
138 Path(id): Path<Uuid>,
139 headers: HeaderMap,
140) -> ApiResult<Json<serde_json::Value>> {
141 load_authorized_query(&state, user_id, &headers, id).await?;
142 let deleted = ObservabilitySavedQuery::delete(state.db.pool(), id)
143 .await
144 .map_err(ApiError::Database)?;
145 if !deleted {
146 return Err(ApiError::InvalidRequest("Saved query not found".into()));
147 }
148 Ok(Json(serde_json::json!({ "deleted": true })))
149}
150
151pub async fn list_dashboards(
155 State(state): State<AppState>,
156 AuthUser(user_id): AuthUser,
157 Path(org_id): Path<Uuid>,
158 headers: HeaderMap,
159) -> ApiResult<Json<Vec<ObservabilityDashboard>>> {
160 authorize_org(&state, user_id, &headers, org_id).await?;
161 let rows = ObservabilityDashboard::list_by_org(state.db.pool(), org_id)
162 .await
163 .map_err(ApiError::Database)?;
164 Ok(Json(rows))
165}
166
167#[derive(Debug, Deserialize)]
168pub struct CreateDashboardRequest {
169 pub name: String,
170 #[serde(default)]
171 pub description: Option<String>,
172 pub layout: serde_json::Value,
173 pub queries: serde_json::Value,
174 #[serde(default)]
175 pub workspace_id: Option<Uuid>,
176}
177
178pub async fn create_dashboard(
180 State(state): State<AppState>,
181 AuthUser(user_id): AuthUser,
182 Path(org_id): Path<Uuid>,
183 headers: HeaderMap,
184 Json(request): Json<CreateDashboardRequest>,
185) -> ApiResult<Json<ObservabilityDashboard>> {
186 authorize_org(&state, user_id, &headers, org_id).await?;
187 if request.name.trim().is_empty() {
188 return Err(ApiError::InvalidRequest("name must not be empty".into()));
189 }
190 let row = ObservabilityDashboard::create(
191 state.db.pool(),
192 CreateDashboard {
193 org_id,
194 workspace_id: request.workspace_id,
195 name: &request.name,
196 description: request.description.as_deref(),
197 layout: &request.layout,
198 queries: &request.queries,
199 created_by: Some(user_id),
200 },
201 )
202 .await
203 .map_err(ApiError::Database)?;
204 Ok(Json(row))
205}
206
207#[derive(Debug, Deserialize)]
208pub struct UpdateDashboardRequest {
209 #[serde(default)]
210 pub name: Option<String>,
211 #[serde(default)]
212 pub layout: Option<serde_json::Value>,
213 #[serde(default)]
214 pub queries: Option<serde_json::Value>,
215}
216
217pub async fn update_dashboard(
219 State(state): State<AppState>,
220 AuthUser(user_id): AuthUser,
221 Path(id): Path<Uuid>,
222 headers: HeaderMap,
223 Json(request): Json<UpdateDashboardRequest>,
224) -> ApiResult<Json<ObservabilityDashboard>> {
225 load_authorized_dashboard(&state, user_id, &headers, id).await?;
226 let updated = ObservabilityDashboard::update(
227 state.db.pool(),
228 id,
229 request.name.as_deref(),
230 request.layout.as_ref(),
231 request.queries.as_ref(),
232 )
233 .await
234 .map_err(ApiError::Database)?
235 .ok_or_else(|| ApiError::InvalidRequest("Dashboard not found".into()))?;
236 Ok(Json(updated))
237}
238
239pub async fn delete_dashboard(
241 State(state): State<AppState>,
242 AuthUser(user_id): AuthUser,
243 Path(id): Path<Uuid>,
244 headers: HeaderMap,
245) -> ApiResult<Json<serde_json::Value>> {
246 load_authorized_dashboard(&state, user_id, &headers, id).await?;
247 let deleted = ObservabilityDashboard::delete(state.db.pool(), id)
248 .await
249 .map_err(ApiError::Database)?;
250 if !deleted {
251 return Err(ApiError::InvalidRequest("Dashboard not found".into()));
252 }
253 Ok(Json(serde_json::json!({ "deleted": true })))
254}
255
256async fn authorize_org(
257 state: &AppState,
258 user_id: Uuid,
259 headers: &HeaderMap,
260 org_id: Uuid,
261) -> ApiResult<()> {
262 let ctx = resolve_org_context(state, user_id, headers, None)
263 .await
264 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
265 if ctx.org_id != org_id {
266 return Err(ApiError::InvalidRequest(
267 "Cannot access observability for a different org".into(),
268 ));
269 }
270 Ok(())
271}
272
273async fn load_authorized_query(
274 state: &AppState,
275 user_id: Uuid,
276 headers: &HeaderMap,
277 id: Uuid,
278) -> ApiResult<ObservabilitySavedQuery> {
279 let row = ObservabilitySavedQuery::find_by_id(state.db.pool(), id)
280 .await
281 .map_err(ApiError::Database)?
282 .ok_or_else(|| ApiError::InvalidRequest("Saved query not found".into()))?;
283 let ctx = resolve_org_context(state, user_id, headers, None)
284 .await
285 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
286 if ctx.org_id != row.org_id {
287 return Err(ApiError::InvalidRequest("Saved query not found".into()));
288 }
289 Ok(row)
290}
291
292async fn load_authorized_dashboard(
293 state: &AppState,
294 user_id: Uuid,
295 headers: &HeaderMap,
296 id: Uuid,
297) -> ApiResult<ObservabilityDashboard> {
298 let row = ObservabilityDashboard::find_by_id(state.db.pool(), id)
299 .await
300 .map_err(ApiError::Database)?
301 .ok_or_else(|| ApiError::InvalidRequest("Dashboard not found".into()))?;
302 let ctx = resolve_org_context(state, user_id, headers, None)
303 .await
304 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
305 if ctx.org_id != row.org_id {
306 return Err(ApiError::InvalidRequest("Dashboard not found".into()));
307 }
308 Ok(row)
309}
310
311#[derive(Debug, serde::Serialize, sqlx::FromRow)]
316pub struct TraceSpanRow {
317 pub deployment_id: Uuid,
318 pub trace_id: String,
319 pub span_id: String,
320 pub parent_span_id: Option<String>,
321 pub service_name: Option<String>,
322 pub name: String,
323 pub kind: Option<i16>,
324 pub start_unix_nano: i64,
325 pub end_unix_nano: i64,
326 pub occurred_at: chrono::DateTime<chrono::Utc>,
327 pub status_code: Option<i16>,
328 pub status_message: Option<String>,
329 pub attributes: serde_json::Value,
330}
331
332#[derive(Debug, Deserialize)]
333pub struct TraceQueryRequest {
334 #[serde(default)]
336 pub deployment_id: Option<Uuid>,
337 #[serde(default)]
339 pub service_name: Option<String>,
340 #[serde(default)]
342 pub name_contains: Option<String>,
343 #[serde(default)]
345 pub status: Option<String>,
346 #[serde(default)]
348 pub since: Option<chrono::DateTime<chrono::Utc>>,
349 #[serde(default)]
351 pub until: Option<chrono::DateTime<chrono::Utc>>,
352 #[serde(default)]
354 pub limit: Option<i64>,
355}
356
357pub async fn query_traces(
365 State(state): State<AppState>,
366 AuthUser(user_id): AuthUser,
367 Path(org_id): Path<Uuid>,
368 headers: HeaderMap,
369 Json(req): Json<TraceQueryRequest>,
370) -> ApiResult<Json<Vec<TraceSpanRow>>> {
371 authorize_org(&state, user_id, &headers, org_id).await?;
372
373 let limit = req.limit.unwrap_or(200).clamp(1, 500);
374 let until = req.until.unwrap_or_else(chrono::Utc::now);
375 let since = req.since.unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::hours(1));
376 if until < since {
377 return Err(ApiError::InvalidRequest("until must be >= since".into()));
378 }
379
380 let status_filter: Option<i16> = match req.status.as_deref() {
381 Some("ok") => Some(1),
382 Some("error") => Some(2),
383 Some("any") | None => None,
384 Some(other) => {
385 return Err(ApiError::InvalidRequest(format!(
386 "status must be 'ok', 'error', or 'any' — got '{other}'"
387 )));
388 }
389 };
390
391 let name_pattern: Option<String> = req
392 .name_contains
393 .as_ref()
394 .map(|s| format!("%{}%", s.replace('%', r"\%").replace('_', r"\_")));
395
396 let rows = sqlx::query_as::<_, TraceSpanRow>(
397 r#"
398 SELECT t.deployment_id, t.trace_id, t.span_id, t.parent_span_id,
399 t.service_name, t.name, t.kind,
400 t.start_unix_nano, t.end_unix_nano, t.occurred_at,
401 t.status_code, t.status_message, t.attributes
402 FROM runtime_traces t
403 JOIN hosted_mocks d ON d.id = t.deployment_id
404 WHERE d.org_id = $1
405 AND t.occurred_at >= $2
406 AND t.occurred_at <= $3
407 AND ($4::uuid IS NULL OR t.deployment_id = $4)
408 AND ($5::text IS NULL OR t.service_name = $5)
409 AND ($6::text IS NULL OR t.name ILIKE $6)
410 AND ($7::int2 IS NULL OR t.status_code = $7)
411 ORDER BY t.occurred_at DESC
412 LIMIT $8
413 "#,
414 )
415 .bind(org_id)
416 .bind(since)
417 .bind(until)
418 .bind(req.deployment_id)
419 .bind(req.service_name)
420 .bind(name_pattern)
421 .bind(status_filter)
422 .bind(limit)
423 .fetch_all(state.db.pool())
424 .await
425 .map_err(ApiError::Database)?;
426
427 Ok(Json(rows))
428}
429
430#[derive(Debug, serde::Serialize)]
435pub struct QueryBucket {
436 pub label: String,
437 pub count: i64,
438}
439
440#[derive(Debug, serde::Serialize)]
441pub struct ExecuteQueryResponse {
442 pub metric: String,
443 pub total: i64,
444 pub window_minutes: i64,
445 pub series: Vec<QueryBucket>,
446}
447
448#[derive(Debug, Deserialize, Default)]
449pub struct ExecuteQueryRequest {
450 #[serde(default)]
453 pub workspace_id: Option<Uuid>,
454 #[serde(default)]
457 pub window_minutes: Option<i64>,
458}
459
460pub async fn execute_saved_query(
468 State(state): State<AppState>,
469 AuthUser(user_id): AuthUser,
470 Path(id): Path<Uuid>,
471 headers: HeaderMap,
472 body: Option<Json<ExecuteQueryRequest>>,
473) -> ApiResult<Json<ExecuteQueryResponse>> {
474 let req = body.map(|Json(b)| b).unwrap_or_default();
475 let query = load_authorized_query(&state, user_id, &headers, id).await?;
476
477 let kind = query
478 .filters
479 .get("kind")
480 .and_then(|v| v.as_str())
481 .ok_or_else(|| {
482 ApiError::InvalidRequest("Saved query filters missing required `kind`".into())
483 })?
484 .to_string();
485 let window_minutes = req
486 .window_minutes
487 .or_else(|| query.filters.get("window_minutes").and_then(|v| v.as_i64()))
488 .unwrap_or(15)
489 .clamp(1, 24 * 60);
490 let workspace_filter = req.workspace_id.or(query.workspace_id);
491
492 let pool = state.db.pool();
493 let response = match kind.as_str() {
494 "request_count" => run_request_count(pool, query.org_id, workspace_filter, window_minutes)
495 .await
496 .map_err(ApiError::Database)?,
497 "request_count_by_status" => {
498 run_request_count_by_status(pool, query.org_id, workspace_filter, window_minutes)
499 .await
500 .map_err(ApiError::Database)?
501 }
502 "incident_count" => {
503 run_incident_count(pool, query.org_id, workspace_filter, window_minutes)
504 .await
505 .map_err(ApiError::Database)?
506 }
507 other => {
508 return Err(ApiError::InvalidRequest(format!(
509 "Unsupported saved-query kind '{other}'. Supported: request_count, request_count_by_status, incident_count"
510 )));
511 }
512 };
513
514 Ok(Json(response))
515}
516
517async fn run_request_count(
525 pool: &sqlx::PgPool,
526 org_id: Uuid,
527 workspace_id: Option<Uuid>,
528 window_minutes: i64,
529) -> sqlx::Result<ExecuteQueryResponse> {
530 let total: i64 = if let Some(ws) = workspace_id {
531 sqlx::query_scalar(
532 r#"
533 SELECT COUNT(*)::BIGINT
534 FROM runtime_captures
535 WHERE workspace_id = $1
536 AND occurred_at >= NOW() - make_interval(mins => $2::int)
537 "#,
538 )
539 .bind(ws)
540 .bind(window_minutes as i32)
541 .fetch_one(pool)
542 .await?
543 } else {
544 sqlx::query_scalar(
545 r#"
546 SELECT COUNT(*)::BIGINT
547 FROM runtime_captures rc
548 JOIN hosted_mocks hm ON hm.id = rc.deployment_id
549 WHERE hm.org_id = $1
550 AND rc.occurred_at >= NOW() - make_interval(mins => $2::int)
551 "#,
552 )
553 .bind(org_id)
554 .bind(window_minutes as i32)
555 .fetch_one(pool)
556 .await?
557 };
558
559 Ok(ExecuteQueryResponse {
560 metric: "request_count".into(),
561 total,
562 window_minutes,
563 series: vec![QueryBucket {
564 label: "all".into(),
565 count: total,
566 }],
567 })
568}
569
570async fn run_request_count_by_status(
571 pool: &sqlx::PgPool,
572 org_id: Uuid,
573 workspace_id: Option<Uuid>,
574 window_minutes: i64,
575) -> sqlx::Result<ExecuteQueryResponse> {
576 let rows: Vec<(Option<i32>, i64)> = if let Some(ws) = workspace_id {
580 sqlx::query_as(
581 r#"
582 SELECT COALESCE(response_status_code, status_code) AS status,
583 COUNT(*)::BIGINT
584 FROM runtime_captures
585 WHERE workspace_id = $1
586 AND occurred_at >= NOW() - make_interval(mins => $2::int)
587 GROUP BY status
588 ORDER BY status NULLS LAST
589 "#,
590 )
591 .bind(ws)
592 .bind(window_minutes as i32)
593 .fetch_all(pool)
594 .await?
595 } else {
596 sqlx::query_as(
597 r#"
598 SELECT COALESCE(rc.response_status_code, rc.status_code) AS status,
599 COUNT(*)::BIGINT
600 FROM runtime_captures rc
601 JOIN hosted_mocks hm ON hm.id = rc.deployment_id
602 WHERE hm.org_id = $1
603 AND rc.occurred_at >= NOW() - make_interval(mins => $2::int)
604 GROUP BY status
605 ORDER BY status NULLS LAST
606 "#,
607 )
608 .bind(org_id)
609 .bind(window_minutes as i32)
610 .fetch_all(pool)
611 .await?
612 };
613
614 let total = rows.iter().map(|(_, c)| *c).sum();
615 let series = rows
616 .into_iter()
617 .map(|(status, count)| QueryBucket {
618 label: status.map(|s| s.to_string()).unwrap_or_else(|| "unknown".into()),
619 count,
620 })
621 .collect();
622
623 Ok(ExecuteQueryResponse {
624 metric: "request_count_by_status".into(),
625 total,
626 window_minutes,
627 series,
628 })
629}
630
631async fn run_incident_count(
632 pool: &sqlx::PgPool,
633 org_id: Uuid,
634 workspace_id: Option<Uuid>,
635 window_minutes: i64,
636) -> sqlx::Result<ExecuteQueryResponse> {
637 let rows: Vec<(String, i64)> = if let Some(ws) = workspace_id {
638 sqlx::query_as(
639 r#"
640 SELECT severity, COUNT(*)::BIGINT
641 FROM incidents
642 WHERE workspace_id = $1
643 AND created_at >= NOW() - make_interval(mins => $2::int)
644 AND status != 'resolved'
645 GROUP BY severity
646 ORDER BY CASE severity
647 WHEN 'critical' THEN 0
648 WHEN 'high' THEN 1
649 WHEN 'medium' THEN 2
650 WHEN 'low' THEN 3
651 ELSE 4
652 END
653 "#,
654 )
655 .bind(ws)
656 .bind(window_minutes as i32)
657 .fetch_all(pool)
658 .await?
659 } else {
660 sqlx::query_as(
661 r#"
662 SELECT severity, COUNT(*)::BIGINT
663 FROM incidents
664 WHERE org_id = $1
665 AND created_at >= NOW() - make_interval(mins => $2::int)
666 AND status != 'resolved'
667 GROUP BY severity
668 ORDER BY CASE severity
669 WHEN 'critical' THEN 0
670 WHEN 'high' THEN 1
671 WHEN 'medium' THEN 2
672 WHEN 'low' THEN 3
673 ELSE 4
674 END
675 "#,
676 )
677 .bind(org_id)
678 .bind(window_minutes as i32)
679 .fetch_all(pool)
680 .await?
681 };
682
683 let total = rows.iter().map(|(_, c)| *c).sum();
684 let series = rows
685 .into_iter()
686 .map(|(severity, count)| QueryBucket {
687 label: severity,
688 count,
689 })
690 .collect();
691
692 Ok(ExecuteQueryResponse {
693 metric: "incident_count".into(),
694 total,
695 window_minutes,
696 series,
697 })
698}