tandem-server 0.5.6

HTTP server for Tandem engine APIs
use std::convert::Infallible;

use axum::body::{Body, Bytes};
use axum::extract::{Extension, State};
use axum::http::{header, HeaderMap, HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use serde_json::{json, Value};
use tandem_types::{EngineEvent, RequestPrincipal};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;

use crate::AppState;

pub(crate) async fn audit_stream(
    State(state): State<AppState>,
    Extension(principal): Extension<RequestPrincipal>,
    headers: HeaderMap,
) -> Response {
    if !audit_admin_allowed(&principal, &headers) {
        return (
            StatusCode::FORBIDDEN,
            axum::Json(json!({
                "error": "Admin capability required",
                "code": "AUDIT_ADMIN_REQUIRED"
            })),
        )
            .into_response();
    }

    let rx = state.event_bus.subscribe();
    let stream = BroadcastStream::new(rx).filter_map(|msg| match msg {
        Ok(event) => audit_event_to_stream_record(&event).map(|record| {
            let line = serde_json::to_string(&record).unwrap_or_else(|_| "{}".to_string()) + "\n";
            Ok::<Bytes, Infallible>(Bytes::from(line))
        }),
        Err(_) => None,
    });

    let mut response = Body::from_stream(stream).into_response();
    response.headers_mut().insert(
        header::CONTENT_TYPE,
        HeaderValue::from_static("application/x-ndjson"),
    );
    response
}

fn audit_admin_allowed(principal: &RequestPrincipal, headers: &HeaderMap) -> bool {
    let tier = headers
        .get("x-tandem-capability-tier")
        .and_then(|value| value.to_str().ok())
        .unwrap_or_default()
        .trim()
        .to_ascii_lowercase();
    if matches!(tier.as_str(), "admin" | "reconfigure") {
        return true;
    }
    headers
        .get("x-tandem-admin")
        .and_then(|value| value.to_str().ok())
        .is_some_and(|value| matches!(value.trim(), "1" | "true" | "yes"))
        || matches!(principal.source.as_str(), "api_token" | "control_panel")
}

pub(crate) fn audit_event_to_stream_record(event: &EngineEvent) -> Option<Value> {
    match event.event_type.as_str() {
        "tool.effect.recorded" => tool_effect_record(event),
        "approval.decision.recorded" => approval_decision_record(event),
        "channel.capability.changed" => capability_change_record(event),
        _ => None,
    }
}

fn base_record(event: &EngineEvent, command: &str, result: Value) -> Value {
    json!({
        "event_type": event.event_type,
        "actor_id": event.properties.get("actor_id").and_then(Value::as_str),
        "executed_as": event.properties.get("executed_as").and_then(Value::as_str).unwrap_or("tandem-server"),
        "command": command,
        "workspace": event.properties.get("workspace").and_then(Value::as_str),
        "tool_call_id": event.properties.get("tool_call_id").and_then(Value::as_str),
        "result": result,
        "timestamp": crate::now_ms(),
        "channel": event.properties.get("channel").and_then(Value::as_str),
    })
}

fn tool_effect_record(event: &EngineEvent) -> Option<Value> {
    let record = event.properties.get("record")?;
    let command = record.get("tool").and_then(Value::as_str).unwrap_or("tool");
    let workspace = record
        .pointer("/args_summary/workspace_root")
        .and_then(Value::as_str);
    let mut row = base_record(
        event,
        command,
        json!({
            "phase": record.get("phase"),
            "status": record.get("status"),
            "error": record.get("error"),
        }),
    );
    let obj = row.as_object_mut()?;
    if let Some(workspace) = workspace {
        obj.insert(
            "workspace".to_string(),
            Value::String(workspace.to_string()),
        );
    }
    if let Some(tool_call_id) = record.get("tool_call_id").and_then(Value::as_str) {
        obj.insert(
            "tool_call_id".to_string(),
            Value::String(tool_call_id.to_string()),
        );
    }
    Some(row)
}

fn approval_decision_record(event: &EngineEvent) -> Option<Value> {
    Some(base_record(
        event,
        "approval_decision",
        json!({
            "run_id": event.properties.get("run_id"),
            "node_id": event.properties.get("node_id"),
            "decision": event.properties.get("decision"),
            "reason": event.properties.get("reason"),
        }),
    ))
}

fn capability_change_record(event: &EngineEvent) -> Option<Value> {
    Some(base_record(
        event,
        "capability_change",
        json!({
            "channel": event.properties.get("channel"),
            "user_id": event.properties.get("user_id"),
            "max_tier": event.properties.get("max_tier"),
        }),
    ))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn maps_tool_effect_event_to_ndjson_record_shape() {
        let event = EngineEvent::new(
            "tool.effect.recorded",
            json!({
                "record": {
                    "tool": "read",
                    "tool_call_id": "call-1",
                    "phase": "outcome",
                    "status": "succeeded",
                    "args_summary": { "workspace_root": "/workspace/acme" }
                }
            }),
        );
        let row = audit_event_to_stream_record(&event).unwrap();
        assert_eq!(row["command"], "read");
        assert_eq!(row["workspace"], "/workspace/acme");
        assert_eq!(row["tool_call_id"], "call-1");
    }

    #[test]
    fn maps_capability_change_event_to_audit_record() {
        let event = EngineEvent::new(
            "channel.capability.changed",
            json!({
                "channel": "telegram",
                "user_id": "42",
                "max_tier": "approve"
            }),
        );
        let row = audit_event_to_stream_record(&event).unwrap();
        assert_eq!(row["command"], "capability_change");
        assert_eq!(row["channel"], "telegram");
        assert_eq!(row["result"]["max_tier"], "approve");
    }
}