tandem-server 0.4.23

HTTP server for Tandem engine APIs
Documentation
use anyhow::Result;
use serde_json::Value;

use crate::app::state::{sha256_hex, truncate_text, AppState};
use crate::bug_monitor::types::BugMonitorIncidentRecord;
use crate::bug_monitor::types::{BugMonitorConfig, BugMonitorSubmission};
use crate::EngineEvent;

pub async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
    let mut excerpt = Vec::new();
    if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
        excerpt.push(reason);
    }
    if let Some(title) = first_string(properties, &["title", "task"]) {
        if !excerpt.iter().any(|row| row == &title) {
            excerpt.push(title);
        }
    }
    let logs = state.logs.read().await;
    for entry in logs.iter().rev().take(3) {
        if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
            excerpt.push(truncate_text(message, 240));
        }
    }
    excerpt.truncate(8);
    excerpt
}

pub async fn process_event(
    state: &AppState,
    event: &EngineEvent,
    config: &BugMonitorConfig,
) -> anyhow::Result<BugMonitorIncidentRecord> {
    let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
        state,
        submission.repo.as_deref().unwrap_or_default(),
        submission.fingerprint.as_deref().unwrap_or_default(),
        submission.title.as_deref(),
        submission.detail.as_deref(),
        &submission.excerpt,
        3,
    )
    .await;
    let fingerprint = submission
        .fingerprint
        .clone()
        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
    let default_workspace_root = state.workspace_index.snapshot().await.root;
    let workspace_root = config
        .workspace_root
        .clone()
        .unwrap_or(default_workspace_root);
    let now = crate::util::time::now_ms();

    let existing = state
        .bug_monitor_incidents
        .read()
        .await
        .values()
        .find(|row| row.fingerprint == fingerprint)
        .cloned();

    let mut incident = if let Some(mut row) = existing {
        row.occurrence_count = row.occurrence_count.saturating_add(1);
        row.updated_at_ms = now;
        row.last_seen_at_ms = Some(now);
        if row.excerpt.is_empty() {
            row.excerpt = submission.excerpt.clone();
        }
        row
    } else {
        BugMonitorIncidentRecord {
            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
            fingerprint: fingerprint.clone(),
            event_type: event.event_type.clone(),
            status: "queued".to_string(),
            repo: submission.repo.clone().unwrap_or_default(),
            workspace_root,
            title: submission
                .title
                .clone()
                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
            detail: submission.detail.clone(),
            excerpt: submission.excerpt.clone(),
            source: submission.source.clone(),
            run_id: submission.run_id.clone(),
            session_id: submission.session_id.clone(),
            correlation_id: submission.correlation_id.clone(),
            component: submission.component.clone(),
            level: submission.level.clone(),
            occurrence_count: 1,
            created_at_ms: now,
            updated_at_ms: now,
            last_seen_at_ms: Some(now),
            draft_id: None,
            triage_run_id: None,
            last_error: None,
            duplicate_summary: None,
            duplicate_matches: None,
            event_payload: Some(event.properties.clone()),
        }
    };
    state.put_bug_monitor_incident(incident.clone()).await?;

    if !duplicate_matches.is_empty() {
        incident.status = "duplicate_suppressed".to_string();
        let duplicate_summary =
            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
        incident.duplicate_summary = Some(duplicate_summary.clone());
        incident.duplicate_matches = Some(duplicate_matches.clone());
        incident.updated_at_ms = crate::util::time::now_ms();
        state.put_bug_monitor_incident(incident.clone()).await?;
        state.event_bus.publish(EngineEvent::new(
            "bug_monitor.incident.duplicate_suppressed",
            serde_json::json!({
                "incident_id": incident.incident_id,
                "fingerprint": incident.fingerprint,
                "eventType": incident.event_type,
                "status": incident.status,
                "duplicate_summary": duplicate_summary,
                "duplicate_matches": duplicate_matches,
            }),
        ));
        return Ok(incident);
    }

    let draft = match state.submit_bug_monitor_draft(submission).await {
        Ok(draft) => draft,
        Err(error) => {
            incident.status = "draft_failed".to_string();
            incident.last_error = Some(truncate_text(&error.to_string(), 500));
            incident.updated_at_ms = crate::util::time::now_ms();
            state.put_bug_monitor_incident(incident.clone()).await?;
            state.event_bus.publish(EngineEvent::new(
                "bug_monitor.incident.detected",
                serde_json::json!({
                    "incident_id": incident.incident_id,
                    "fingerprint": incident.fingerprint,
                    "eventType": incident.event_type,
                    "draft_id": incident.draft_id,
                    "triage_run_id": incident.triage_run_id,
                    "status": incident.status,
                    "detail": incident.last_error,
                }),
            ));
            return Ok(incident);
        }
    };
    incident.draft_id = Some(draft.draft_id.clone());
    incident.status = "draft_created".to_string();
    state.put_bug_monitor_incident(incident.clone()).await?;

    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
        state.clone(),
        &draft.draft_id,
        true,
    )
    .await
    {
        Ok((updated_draft, _run_id, _deduped)) => {
            incident.triage_run_id = updated_draft.triage_run_id.clone();
            if incident.triage_run_id.is_some() {
                incident.status = "triage_queued".to_string();
            }
            incident.last_error = None;
        }
        Err(error) => {
            incident.status = "draft_created".to_string();
            incident.last_error = Some(truncate_text(&error.to_string(), 500));
        }
    }

    if let Some(draft_id) = incident.draft_id.clone() {
        let latest_draft = state
            .get_bug_monitor_draft(&draft_id)
            .await
            .unwrap_or(draft.clone());
        match crate::bug_monitor_github::publish_draft(
            state,
            &draft_id,
            Some(&incident.incident_id),
            crate::bug_monitor_github::PublishMode::Auto,
        )
        .await
        {
            Ok(outcome) => {
                incident.status = outcome.action;
                incident.last_error = None;
            }
            Err(error) => {
                let detail = truncate_text(&error.to_string(), 500);
                incident.last_error = Some(detail.clone());
                let mut failed_draft = latest_draft;
                failed_draft.status = "github_post_failed".to_string();
                failed_draft.github_status = Some("github_post_failed".to_string());
                failed_draft.last_post_error = Some(detail.clone());
                let evidence_digest = failed_draft.evidence_digest.clone();
                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
                let _ = crate::bug_monitor_github::record_post_failure(
                    state,
                    &failed_draft,
                    Some(&incident.incident_id),
                    "auto_post",
                    evidence_digest.as_deref(),
                    &detail,
                )
                .await;
            }
        }
    }

    incident.updated_at_ms = crate::util::time::now_ms();
    state.put_bug_monitor_incident(incident.clone()).await?;
    state.event_bus.publish(EngineEvent::new(
        "bug_monitor.incident.detected",
        serde_json::json!({
            "incident_id": incident.incident_id,
            "fingerprint": incident.fingerprint,
            "eventType": incident.event_type,
            "draft_id": incident.draft_id,
            "triage_run_id": incident.triage_run_id,
            "status": incident.status,
        }),
    ));
    Ok(incident)
}
pub fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
    for key in keys {
        if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
            let trimmed = value.trim();
            if !trimmed.is_empty() {
                return Some(trimmed.to_string());
            }
        }
    }
    None
}

pub async fn build_bug_monitor_submission_from_event(
    state: &AppState,
    config: &BugMonitorConfig,
    event: &EngineEvent,
) -> Result<BugMonitorSubmission> {
    let repo = config
        .repo
        .clone()
        .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
    let default_workspace_root = state.workspace_index.snapshot().await.root;
    let workspace_root = config
        .workspace_root
        .clone()
        .unwrap_or(default_workspace_root);
    let reason = first_string(
        &event.properties,
        &["reason", "error", "detail", "message", "summary"],
    );
    let run_id = first_string(&event.properties, &["runID", "run_id"]);
    let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
    let correlation_id = first_string(
        &event.properties,
        &["correlationID", "correlation_id", "commandID", "command_id"],
    );
    let component = first_string(
        &event.properties,
        &[
            "component",
            "routineID",
            "routine_id",
            "workflowID",
            "workflow_id",
            "task",
            "title",
        ],
    );
    let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
    if excerpt.is_empty() {
        if let Some(reason) = reason.as_ref() {
            excerpt.push(reason.clone());
        }
    }
    let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
    let fingerprint = sha256_hex(&[
        repo.as_str(),
        workspace_root.as_str(),
        event.event_type.as_str(),
        reason.as_deref().unwrap_or(""),
        run_id.as_deref().unwrap_or(""),
        session_id.as_deref().unwrap_or(""),
        correlation_id.as_deref().unwrap_or(""),
        component.as_deref().unwrap_or(""),
        serialized.as_str(),
    ]);
    let title = if let Some(component) = component.as_ref() {
        format!("{} failure in {}", event.event_type, component)
    } else {
        format!("{} detected", event.event_type)
    };
    let mut detail_lines = vec![
        format!("event_type: {}", event.event_type),
        format!("workspace_root: {}", workspace_root),
    ];
    if let Some(reason) = reason.as_ref() {
        detail_lines.push(format!("reason: {reason}"));
    }
    if let Some(run_id) = run_id.as_ref() {
        detail_lines.push(format!("run_id: {run_id}"));
    }
    if let Some(session_id) = session_id.as_ref() {
        detail_lines.push(format!("session_id: {session_id}"));
    }
    if let Some(correlation_id) = correlation_id.as_ref() {
        detail_lines.push(format!("correlation_id: {correlation_id}"));
    }
    if let Some(component) = component.as_ref() {
        detail_lines.push(format!("component: {component}"));
    }
    if !serialized.trim().is_empty() {
        detail_lines.push(String::new());
        detail_lines.push("payload:".to_string());
        detail_lines.push(truncate_text(&serialized, 2_000));
    }

    Ok(BugMonitorSubmission {
        repo: Some(repo),
        title: Some(title),
        detail: Some(detail_lines.join("\n")),
        source: Some("tandem_events".to_string()),
        run_id,
        session_id,
        correlation_id,
        file_name: None,
        process: Some("tandem-engine".to_string()),
        component,
        event: Some(event.event_type.clone()),
        level: Some("error".to_string()),
        excerpt,
        fingerprint: Some(fingerprint),
    })
}