tandem-server 0.6.5

HTTP server for Tandem engine APIs
fn execution_error_blocker_category(detail: &str) -> &'static str {
    let lowered = detail.trim().to_ascii_lowercase();
    if lowered.contains("failed to reach provider")
        || lowered.contains("error sending request")
        || lowered.contains("request error")
        || lowered.contains("connect timeout")
        || lowered.contains("connection refused")
        || lowered.contains("dns error")
        || lowered.contains("timed out")
    {
        "provider_connect_timeout"
    } else if lowered.contains("provider returned error")
        || lowered.contains("provider stream chunk error")
        || lowered.contains("provider_server_error")
        || lowered.contains("server error")
    {
        "provider_server_error"
    } else if lowered.contains("authentication") || lowered.contains("unauthorized") {
        "provider_auth"
    } else {
        "execution_error"
    }
}

fn normalize_execution_error_detail(detail: &str) -> String {
    let trimmed = detail.trim();
    if trimmed.is_empty() {
        return "node execution failed before producing a final response".to_string();
    }
    if trimmed.eq_ignore_ascii_case("Provider returned error") {
        return "provider returned error before any node response was recorded".to_string();
    }
    trimmed.to_string()
}

fn execution_error_retry_floor(detail: &str, blocker_category: &str) -> Option<u32> {
    if matches!(
        blocker_category,
        "provider_connect_timeout" | "provider_server_error"
    ) {
        return Some(3);
    }
    let lowered = detail.trim().to_ascii_lowercase();
    if lowered.contains("required output") && lowered.contains("was not created") {
        return Some(3);
    }
    if lowered.contains("truncated source identity")
        || lowered.contains("read the full upstream artifact")
    {
        return Some(3);
    }
    if lowered.contains("connector source artifact only materialized the truncated preview rows")
        || lowered.contains("connector_truncated_preview_only")
    {
        return Some(3);
    }
    None
}

fn automation_node_execution_error_max_attempts(
    node: &crate::automation_v2::types::AutomationFlowNode,
    detail: &str,
    blocker_category: &str,
) -> u32 {
    let configured = crate::app::state::automation_node_max_attempts(node);
    execution_error_retry_floor(detail, blocker_category)
        .map(|floor| configured.max(floor))
        .unwrap_or(configured)
}

fn automation_node_max_attempts_for_recorded_output(
    node: &crate::automation_v2::types::AutomationFlowNode,
    output: Option<&Value>,
) -> u32 {
    let Some(output) = output else {
        return crate::app::state::automation_node_max_attempts(node);
    };
    let detail = output
        .get("blocked_reason")
        .or_else(|| output.get("summary"))
        .and_then(Value::as_str)
        .unwrap_or_default();
    let blocker_category = output
        .get("blocker_category")
        .and_then(Value::as_str)
        .unwrap_or_else(|| execution_error_blocker_category(detail));
    automation_node_execution_error_max_attempts(node, detail, blocker_category)
}

fn automation_node_recorded_attempts_exhausted(
    run: &crate::automation_v2::types::AutomationV2RunRecord,
    node_id: &str,
    node: &crate::automation_v2::types::AutomationFlowNode,
) -> bool {
    let attempts = run
        .checkpoint
        .node_attempts
        .get(node_id)
        .copied()
        .unwrap_or(0);
    let output = run.checkpoint.node_outputs.get(node_id);
    attempts >= automation_node_max_attempts_for_recorded_output(node, output)
}

fn retry_failure_class_from_blocker_category(blocker_category: &str) -> &'static str {
    match blocker_category {
        "provider_connect_timeout" | "provider_server_error" => "provider_transient",
        "provider_auth" => "provider_terminal",
        "tool_resolution_failed" => "tool_resolution",
        _ => "contract_miss",
    }
}

fn automation_node_retry_decision(
    node: &crate::automation_v2::types::AutomationFlowNode,
    detail: &str,
    attempts: u32,
    max_attempts: u32,
    failure_class: &str,
    occurred_at_ms: u64,
) -> tandem_automation::RetryDecision {
    let mut policy =
        tandem_automation::RetryPolicy::from_node_retry_policy(node.retry_policy.as_ref(), max_attempts);
    policy.max_attempts = max_attempts;
    if failure_class == "provider_transient"
        && matches!(
            policy.backoff.strategy,
            tandem_automation::RetryBackoffStrategy::None
        )
    {
        policy.backoff = tandem_automation::RetryBackoffPolicy::transient_provider_default();
    }
    policy.decide(tandem_automation::RetryDecisionInput {
        failure_class,
        reason: detail,
        attempt: attempts,
        occurred_at_ms,
        elapsed_ms: None,
    })
}

fn transient_provider_retry_backoff_ms(detail: &str, attempts: u32) -> Option<u64> {
    match execution_error_blocker_category(detail) {
        "provider_connect_timeout" | "provider_server_error" => {
            tandem_automation::RetryBackoffPolicy::transient_provider_default()
                .delay_ms_for_attempt(attempts)
        }
        _ => None,
    }
}

#[cfg(test)]
mod retry_policy_tests {
    use super::*;
    use serde_json::json;

    fn retry_node() -> crate::automation_v2::types::AutomationFlowNode {
        crate::automation_v2::types::AutomationFlowNode {
            knowledge: tandem_orchestrator::KnowledgeBinding::default(),
            node_id: "retry-node".to_string(),
            agent_id: "agent".to_string(),
            objective: "Retry".to_string(),
            depends_on: Vec::new(),
            input_refs: Vec::new(),
            output_contract: None,
            tool_policy: None,
            mcp_policy: None,
            retry_policy: None,
            timeout_ms: None,
            max_tool_calls: None,
            stage_kind: None,
            gate: None,
            metadata: None,
        }
    }

    #[test]
    fn retry_decision_records_provider_backoff_metadata() {
        let node = retry_node();
        let decision = automation_node_retry_decision(
            &node,
            "provider stream connect timeout after 90000 ms",
            2,
            3,
            "provider_transient",
            1_000,
        );

        assert_eq!(decision.decision, "retry_scheduled");
        assert_eq!(decision.failure_class, "provider_transient");
        assert_eq!(decision.backoff_ms, Some(5_000));
        assert_eq!(decision.next_retry_at_ms, Some(6_000));
        assert!(!decision.terminal);
    }

    #[test]
    fn retry_decision_honors_configured_failure_classes() {
        let mut node = retry_node();
        node.retry_policy = Some(json!({
            "max_attempts": 5,
            "retryable_failure_classes": ["provider_transient"]
        }));

        let decision = automation_node_retry_decision(
            &node,
            "required output was not created",
            1,
            5,
            "contract_miss",
            10,
        );

        assert_eq!(decision.decision, "not_retryable");
        assert!(decision.terminal);
    }
}