roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
//! # observability
//!
//! REST endpoints for pipeline trace listing and delegation outcome analytics.
//!
//! `GET /api/observability/traces`                    — list traces for a session.
//! `GET /api/observability/traces/:turn_id/waterfall` — stage waterfall for a turn.
//! `GET /api/observability/delegation/outcomes`       — recent delegation outcomes.
//! `GET /api/observability/delegation/stats`          — per-agent delegation stats.

use axum::{
    extract::{Path, Query, State},
    response::IntoResponse,
};
use serde::Deserialize;

use super::{AppState, JsonError, bad_request, internal_err, not_found};

// ── Query parameter types ─────────────────────────────────────

#[derive(Debug, Deserialize)]
pub struct TracesQuery {
    pub session_id: Option<String>,
    pub limit: Option<i64>,
}

#[derive(Debug, Deserialize)]
pub struct LimitQuery {
    pub limit: Option<i64>,
}

#[derive(Debug, Deserialize)]
pub struct HoursQuery {
    pub hours: Option<i64>,
}

// ── Handlers ─────────────────────────────────────────────────

/// GET /api/observability/traces?session_id=X&limit=N
///
/// Returns pipeline traces for the given session, most recent first.
/// `limit` defaults to 50 and is capped at 500.
pub async fn list_traces(
    State(state): State<AppState>,
    Query(params): Query<TracesQuery>,
) -> Result<impl IntoResponse, JsonError> {
    let session_id = params
        .session_id
        .ok_or_else(|| bad_request("session_id is required"))?;

    let limit = params.limit.unwrap_or(50).clamp(1, 500);

    let rows = roboticus_db::traces::list_pipeline_traces(&state.db, &session_id, limit)
        .map_err(|e| internal_err(&e))?;

    let items: Vec<serde_json::Value> = rows
        .into_iter()
        .map(|row| {
            let stages: serde_json::Value =
                serde_json::from_str(&row.stages_json).unwrap_or(serde_json::Value::Array(vec![]));
            let stage_count = stages.as_array().map(|a| a.len()).unwrap_or(0);
            serde_json::json!({
                "id": row.id,
                "turn_id": row.turn_id,
                "session_id": row.session_id,
                "channel": row.channel,
                "total_ms": row.total_ms,
                "stages": stages,
                "stage_count": stage_count,
                "created_at": row.created_at,
            })
        })
        .collect();

    Ok(axum::Json(serde_json::json!({ "traces": items })))
}

/// GET /api/observability/traces/:turn_id/waterfall
///
/// Returns a structured stage waterfall for a single pipeline turn:
/// each stage entry includes name, duration, outcome, and optional annotations.
pub async fn trace_waterfall(
    State(state): State<AppState>,
    Path(turn_id): Path<String>,
) -> Result<impl IntoResponse, JsonError> {
    let row = roboticus_db::traces::get_pipeline_trace(&state.db, &turn_id)
        .map_err(|e| internal_err(&e))?
        .ok_or_else(|| not_found(format!("no trace for turn {turn_id}")))?;

    let stages_raw: serde_json::Value =
        serde_json::from_str(&row.stages_json).unwrap_or(serde_json::Value::Array(vec![]));

    // Build the waterfall: normalise each stage entry into a consistent shape.
    let waterfall: Vec<serde_json::Value> = match stages_raw.as_array() {
        Some(arr) => arr
            .iter()
            .map(|stage| {
                serde_json::json!({
                    "name": stage.get("name").or_else(|| stage.get("stage")).cloned()
                        .unwrap_or(serde_json::Value::String("unknown".into())),
                    "duration_ms": stage.get("duration_ms").or_else(|| stage.get("ms")).cloned()
                        .unwrap_or(serde_json::Value::Null),
                    "outcome": stage.get("outcome").or_else(|| stage.get("status")).cloned()
                        .unwrap_or(serde_json::Value::Null),
                    "annotations": stage.get("annotations").cloned()
                        .unwrap_or(serde_json::Value::Null),
                })
            })
            .collect(),
        None => vec![],
    };

    Ok(axum::Json(serde_json::json!({
        "turn_id": row.turn_id,
        "session_id": row.session_id,
        "channel": row.channel,
        "total_ms": row.total_ms,
        "waterfall": waterfall,
    })))
}

/// GET /api/observability/delegation/outcomes?limit=N
///
/// Returns recent delegation outcomes across all sessions. `limit` defaults
/// to 50 and is capped at 500.
pub async fn delegation_outcomes(
    State(state): State<AppState>,
    Query(params): Query<LimitQuery>,
) -> Result<impl IntoResponse, JsonError> {
    let limit = params.limit.unwrap_or(50).clamp(1, 500);

    let rows = roboticus_db::delegation::recent_delegation_outcomes(&state.db, limit)
        .map_err(|e| internal_err(&e))?;

    Ok(axum::Json(serde_json::json!({ "outcomes": rows })))
}

/// GET /api/observability/delegation/stats?hours=24
///
/// Returns per-agent delegation statistics over the last `hours` hours.
/// `hours` defaults to 24 and is capped at 720 (30 days).
pub async fn delegation_stats(
    State(state): State<AppState>,
    Query(params): Query<HoursQuery>,
) -> Result<impl IntoResponse, JsonError> {
    let hours = params.hours.unwrap_or(24).clamp(1, 720);

    let stats = roboticus_db::delegation::delegation_stats_by_agent(&state.db, hours)
        .map_err(|e| internal_err(&e))?;

    Ok(axum::Json(serde_json::json!({
        "hours": hours,
        "stats": stats,
    })))
}