roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
//! # traces
//!
//! REST endpoints for reading pipeline traces and ReAct (Flight Recorder) traces.
//!
//! `GET /api/traces/{turn_id}`         — pipeline trace with per-stage timing.
//! `GET /api/traces/{turn_id}/react`   — Flight Recorder ReAct detail.
//! `GET /api/traces/{turn_id}/export`  — full trace export (JSON download).
//! `GET /api/traces/{turn_id}/flow`    — decision flow graph for visualization.

use axum::{
    extract::{Path, State},
    http::header,
    response::IntoResponse,
};

use super::{AppState, JsonError, internal_err, not_found};

/// GET /api/traces/:turn_id — pipeline trace with stage timing.
pub async fn get_trace(
    State(state): State<AppState>,
    Path(turn_id): Path<String>,
) -> Result<impl IntoResponse, JsonError> {
    let row = roboticus_db::traces::get_pipeline_trace(&state.db, &turn_id)
        .map_err(|e| internal_err(&e))?
        .ok_or_else(|| not_found(format!("no trace for turn {turn_id}")))?;

    let stages: serde_json::Value =
        serde_json::from_str(&row.stages_json).unwrap_or(serde_json::Value::Array(vec![]));

    Ok(axum::Json(serde_json::json!({
        "id": row.id,
        "turn_id": row.turn_id,
        "session_id": row.session_id,
        "channel": row.channel,
        "total_ms": row.total_ms,
        "stages": stages,
    })))
}

/// GET /api/traces/:turn_id/react — Flight Recorder ReAct detail.
pub async fn get_react_trace_handler(
    State(state): State<AppState>,
    Path(turn_id): Path<String>,
) -> Result<impl IntoResponse, JsonError> {
    let react_json = roboticus_db::traces::get_react_trace(&state.db, &turn_id)
        .map_err(|e| internal_err(&e))?
        .ok_or_else(|| not_found(format!("no react trace for turn {turn_id}")))?;

    let parsed: serde_json::Value =
        serde_json::from_str(&react_json).map_err(|e| internal_err(&e))?;

    Ok(axum::Json(parsed))
}

/// GET /api/traces/:turn_id/export — full trace export as downloadable JSON.
///
/// Returns all trace data (stages, react steps, inference params) as a
/// single JSON document with Content-Disposition header for download.
pub async fn export_trace(
    State(state): State<AppState>,
    Path(turn_id): Path<String>,
) -> Result<impl IntoResponse, JsonError> {
    let row = roboticus_db::traces::get_full_trace_for_export(&state.db, &turn_id)
        .map_err(|e| internal_err(&e))?
        .ok_or_else(|| not_found(format!("no trace for turn {turn_id}")))?;

    // Parse JSON columns into values for a clean export
    let stages: serde_json::Value =
        serde_json::from_str(&row.stages_json).unwrap_or(serde_json::Value::Array(vec![]));
    let react: serde_json::Value = row
        .react_trace_json
        .as_deref()
        .and_then(|s| serde_json::from_str(s).ok())
        .unwrap_or(serde_json::Value::Null);
    let inference_params: serde_json::Value = row
        .inference_params_json
        .as_deref()
        .and_then(|s| serde_json::from_str(s).ok())
        .unwrap_or(serde_json::Value::Null);

    let export = serde_json::json!({
        "id": row.id,
        "turn_id": row.turn_id,
        "session_id": row.session_id,
        "channel": row.channel,
        "total_ms": row.total_ms,
        "created_at": row.created_at,
        "stages": stages,
        "react_trace": react,
        "inference_params": inference_params,
    });

    let body = serde_json::to_string_pretty(&export).map_err(|e| internal_err(&e))?;
    let filename = format!("trace-{turn_id}.json");

    Ok((
        [
            (header::CONTENT_TYPE, "application/json".to_string()),
            (
                header::CONTENT_DISPOSITION,
                format!("attachment; filename=\"{filename}\""),
            ),
        ],
        body,
    ))
}

// ── Flow graph helpers ────────────────────────────────────────

/// Human-readable label for a pipeline stage name.
fn stage_label(name: &str) -> &str {
    match name {
        "input_validation" => "Input Validation",
        "injection_defense" => "Injection Defense",
        "dedup_tracking" => "Dedup Check",
        "session_resolution" => "Session Resolution",
        "task_operating_state" => "Intent & Planning",
        "decomposition_gate" => "Decomposition Gate",
        "delegated_execution" => "Delegation",
        "shortcut_dispatch" => "Shortcut Check",
        "cache_check" => "Cache Lookup",
        "model_selection" => "Model Selection",
        "inference" => "LLM Inference",
        "guard_chain" => "Guard Chain",
        "post_turn_ingest" => "Memory Ingest",
        "nickname_refinement" => "Nickname Refinement",
        other => other,
    }
}

/// Derive a flow status string from stage outcome and optional guard data.
fn flow_status(outcome: &str, is_guard: bool, guard_fired: bool) -> &'static str {
    if is_guard && guard_fired {
        "retry"
    } else {
        match outcome {
            "Ok" => "pass",
            "Skipped" => "skip",
            "Error" => "error",
            _ => "executed",
        }
    }
}

/// GET /api/traces/:turn_id/flow — decision flow graph for visualization.
pub async fn get_trace_flow(
    State(state): State<AppState>,
    Path(turn_id): Path<String>,
) -> Result<impl IntoResponse, JsonError> {
    // 1. Read the pipeline trace row.
    let row = roboticus_db::traces::get_pipeline_trace(&state.db, &turn_id)
        .map_err(|e| internal_err(&e))?
        .ok_or_else(|| not_found(format!("no trace for turn {turn_id}")))?;

    // 2. Parse stages.
    let stages: Vec<serde_json::Value> = serde_json::from_str(&row.stages_json).unwrap_or_default();

    // 3. Optionally parse react trace for guard detail.
    let react_steps: Vec<serde_json::Value> =
        roboticus_db::traces::get_react_trace(&state.db, &turn_id)
            .ok()
            .flatten()
            .and_then(|json_str| {
                serde_json::from_str::<serde_json::Value>(&json_str)
                    .ok()
                    .and_then(|v| {
                        v.get("steps")
                            .and_then(|s| serde_json::from_value(s.clone()).ok())
                    })
            })
            .unwrap_or_default();

    // Collect guard steps from react trace.
    let guard_details: Vec<&serde_json::Value> = react_steps
        .iter()
        .filter(|s| {
            let t = s.get("type").and_then(|v| v.as_str()).unwrap_or("");
            t == "guard" || t == "Guard"
        })
        .collect();

    let any_guard_fired = guard_details
        .iter()
        .any(|g| g.get("fired").and_then(|v| v.as_bool()).unwrap_or(false));

    // 4. Build nodes.
    let nodes: Vec<serde_json::Value> = stages
        .iter()
        .map(|stage| {
            let name = stage
                .get("name")
                .and_then(|v| v.as_str())
                .unwrap_or("unknown");
            let duration_ms = stage
                .get("duration_ms")
                .and_then(|v| v.as_i64())
                .unwrap_or(0);
            let outcome = stage
                .get("outcome")
                .and_then(|v| v.as_str())
                .unwrap_or("Ok");
            let annotations = stage
                .get("annotations")
                .cloned()
                .unwrap_or(serde_json::Value::Object(Default::default()));

            let is_guard = name == "guard_chain";
            let status = flow_status(outcome, is_guard, any_guard_fired);

            let mut detail = annotations;

            // Enrich guard_chain node with react guard details.
            if is_guard && !guard_details.is_empty() {
                let guards: Vec<serde_json::Value> = guard_details
                    .iter()
                    .map(|g| {
                        serde_json::json!({
                            "guard_name": g.get("guard_name").or_else(|| g.get("name")),
                            "fired": g.get("fired"),
                            "action": g.get("action"),
                            "rejected_content": g.get("rejected_content"),
                            "replacement_content": g.get("replacement_content"),
                        })
                    })
                    .collect();
                if let serde_json::Value::Object(ref mut map) = detail {
                    map.insert("guards".into(), serde_json::Value::Array(guards));
                }
            }

            serde_json::json!({
                "id": name,
                "label": stage_label(name),
                "status": status,
                "duration_ms": duration_ms,
                "detail": detail,
            })
        })
        .collect();

    Ok(axum::Json(serde_json::json!({
        "turn_id": row.turn_id,
        "total_ms": row.total_ms,
        "nodes": nodes,
    })))
}