tandem-server 0.6.5

HTTP server for Tandem engine APIs
use serde_json::json;

use crate::app::state::automation::{QueueReason, SchedulerMetadata};
use crate::app::state::AppState;
use crate::automation_v2::types::{AutomationRunStatus, AutomationV2RunRecord};

pub(crate) struct RetryBackoffRequest {
    node_id: String,
    attempts: u32,
    max_attempts: u32,
    retry_decision: tandem_automation::RetryDecision,
    detail: String,
    backoff_ms: u64,
}

pub(crate) fn retry_backoff_is_due(run: &AutomationV2RunRecord, now_ms: u64) -> bool {
    let Some(scheduler) = run.scheduler.as_ref() else {
        return true;
    };
    if scheduler.queue_reason != Some(QueueReason::RetryBackoff) {
        return true;
    }
    scheduler
        .retry_after_ms
        .map(|retry_after_ms| retry_after_ms <= now_ms)
        .unwrap_or(true)
}

pub(crate) fn retry_backoff_remaining_ms(run: &AutomationV2RunRecord, now_ms: u64) -> Option<u64> {
    let scheduler = run.scheduler.as_ref()?;
    if scheduler.queue_reason != Some(QueueReason::RetryBackoff) {
        return None;
    }
    scheduler
        .retry_after_ms
        .map(|retry_after_ms| retry_after_ms.saturating_sub(now_ms))
}

pub(crate) fn retry_backoff_scheduler_metadata(
    run: &AutomationV2RunRecord,
    node_id: &str,
    attempts: u32,
    retry_decision: &tandem_automation::RetryDecision,
    detail: &str,
    now_ms: u64,
) -> Option<SchedulerMetadata> {
    let backoff_ms = retry_decision.backoff_ms?;
    let retry_after_ms = retry_decision
        .next_retry_at_ms
        .unwrap_or_else(|| now_ms.saturating_add(backoff_ms));
    Some(SchedulerMetadata {
        tenant_context: run.tenant_context.clone(),
        queue_reason: Some(QueueReason::RetryBackoff),
        resource_key: Some(format!(
            "automation://{}/runs/{}/nodes/{node_id}",
            run.automation_id, run.run_id
        )),
        rate_limited_provider: None,
        queued_at_ms: now_ms,
        retry_node_id: Some(node_id.to_string()),
        retry_attempt: Some(attempts.saturating_add(1)),
        retry_backoff_ms: Some(backoff_ms),
        retry_after_ms: Some(retry_after_ms),
        retry_reason: Some(crate::app::state::truncate_text(detail, 500)),
    })
}

pub(crate) fn record_pending_retry_backoff(
    pending: &mut Option<RetryBackoffRequest>,
    node_id: &str,
    attempts: u32,
    max_attempts: u32,
    retry_decision: &tandem_automation::RetryDecision,
    detail: &str,
    backoff_ms: u64,
) {
    if pending.is_some() {
        return;
    }
    *pending = Some(RetryBackoffRequest {
        node_id: node_id.to_string(),
        attempts,
        max_attempts,
        retry_decision: retry_decision.clone(),
        detail: detail.to_string(),
        backoff_ms,
    });
}

pub(crate) fn queue_run_for_retry_backoff(
    run: &mut AutomationV2RunRecord,
    node_id: &str,
    attempts: u32,
    max_attempts: u32,
    retry_decision: &tandem_automation::RetryDecision,
    detail: &str,
    expected_execution_claim_epoch: u64,
    now_ms: u64,
) -> Option<SchedulerMetadata> {
    if run.status != AutomationRunStatus::Running
        || run.execution_claim_epoch != expected_execution_claim_epoch
    {
        return None;
    }
    let metadata =
        retry_backoff_scheduler_metadata(run, node_id, attempts, retry_decision, detail, now_ms)?;
    apply_retry_backoff_queue(
        run,
        &metadata,
        node_id,
        attempts,
        max_attempts,
        retry_decision,
        detail,
    );
    Some(metadata)
}

fn apply_retry_backoff_queue(
    run: &mut AutomationV2RunRecord,
    metadata: &SchedulerMetadata,
    node_id: &str,
    attempts: u32,
    max_attempts: u32,
    retry_decision: &tandem_automation::RetryDecision,
    detail: &str,
) {
    let backoff_ms = metadata.retry_backoff_ms.unwrap_or_default();
    let retry_after_ms = metadata.retry_after_ms.unwrap_or(metadata.queued_at_ms);
    let next_attempt = attempts.saturating_add(1);
    run.status = AutomationRunStatus::Queued;
    run.scheduler = Some(metadata.to_owned());
    run.finished_at_ms = None;
    run.stop_kind = None;
    run.stop_reason = None;
    run.pause_reason = None;
    run.resume_reason = Some("retry_backoff_scheduled".to_string());
    run.active_session_ids.clear();
    run.active_instance_ids.clear();
    run.detail = Some(format!(
        "retrying node `{node_id}` after transient provider failure; queued for retry in {backoff_ms} ms before attempt {next_attempt}/{max_attempts}: {detail}"
    ));
    crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
        run,
        "node_retry_backoff_scheduled",
        Some(format!(
            "node `{node_id}` retry scheduled after {backoff_ms} ms"
        )),
        None,
        Some(json!({
            "node_id": node_id,
            "attempt": attempts,
            "next_attempt": next_attempt,
            "max_attempts": max_attempts,
            "backoff_ms": backoff_ms,
            "retry_after_ms": retry_after_ms,
            "reason": detail,
            "retry_decision": retry_decision,
        })),
    );
}

pub(crate) async fn queue_pending_retry_backoff(
    state: &AppState,
    run_id: &str,
    expected_execution_claim_epoch: u64,
    pending: Option<RetryBackoffRequest>,
) -> bool {
    let Some(request) = pending else {
        return false;
    };
    let scheduled_at_ms = crate::util::time::now_ms();
    let scheduled = state
        .update_automation_v2_run(run_id, |row| {
            let _ = queue_run_for_retry_backoff(
                row,
                &request.node_id,
                request.attempts,
                request.max_attempts,
                &request.retry_decision,
                &request.detail,
                expected_execution_claim_epoch,
                scheduled_at_ms,
            );
        })
        .await;
    let queued = scheduled.as_ref().is_some_and(|row| {
        row.status == AutomationRunStatus::Queued
            && row.scheduler.as_ref().is_some_and(|metadata| {
                metadata.queue_reason == Some(QueueReason::RetryBackoff)
                    && metadata.retry_node_id.as_deref() == Some(request.node_id.as_str())
            })
    });
    if queued {
        tracing::info!(
            run_id = %run_id,
            node_id = %request.node_id,
            backoff_ms = request.backoff_ms,
            next_attempt = request.attempts.saturating_add(1),
            max_attempts = request.max_attempts,
            "automation node retry queued for durable backoff"
        );
    }
    queued
}

#[cfg(test)]
mod tests {
    use serde_json::json;

    use super::*;

    fn run_with_retry_after(retry_after_ms: u64) -> AutomationV2RunRecord {
        AutomationV2RunRecord {
            run_id: "run-retry".to_string(),
            automation_id: "automation-retry".to_string(),
            tenant_context: tandem_types::TenantContext::local_implicit(),
            trigger_type: "manual".to_string(),
            status: AutomationRunStatus::Queued,
            created_at_ms: 1,
            updated_at_ms: 1,
            started_at_ms: Some(1),
            finished_at_ms: None,
            active_session_ids: Vec::new(),
            latest_session_id: None,
            active_instance_ids: Vec::new(),
            checkpoint: tandem_automation::AutomationRunCheckpoint {
                completed_nodes: Vec::new(),
                pending_nodes: vec!["node-a".to_string()],
                node_outputs: Default::default(),
                node_attempts: Default::default(),
                node_attempt_verdicts: Default::default(),
                blocked_nodes: Vec::new(),
                awaiting_gate: None,
                gate_history: Vec::new(),
                lifecycle_history: Vec::new(),
                last_failure: None,
            },
            runtime_context: None,
            automation_snapshot: None,
            workflow_definition_version: None,
            workflow_definition_snapshot_hash: None,
            execution_claim: None,
            execution_claim_epoch: 0,
            pause_reason: None,
            resume_reason: None,
            detail: None,
            stop_kind: None,
            stop_reason: None,
            prompt_tokens: 0,
            completion_tokens: 0,
            total_tokens: 0,
            estimated_cost_usd: 0.0,
            scheduler: Some(SchedulerMetadata {
                tenant_context: tandem_types::TenantContext::local_implicit(),
                queue_reason: Some(QueueReason::RetryBackoff),
                resource_key: Some(
                    "automation://automation-retry/runs/run-retry/nodes/node-a".to_string(),
                ),
                rate_limited_provider: None,
                queued_at_ms: 100,
                retry_node_id: Some("node-a".to_string()),
                retry_attempt: Some(2),
                retry_backoff_ms: Some(500),
                retry_after_ms: Some(retry_after_ms),
                retry_reason: Some("provider timeout".to_string()),
            }),
            trigger_reason: None,
            consumed_handoff_id: None,
            learning_summary: None,
            effective_execution_profile: tandem_automation::ExecutionProfile::Strict,
            requested_execution_profile: None,
        }
    }

    #[test]
    fn retry_backoff_due_checks_scheduler_due_time() {
        let run = run_with_retry_after(1_500);

        assert!(!retry_backoff_is_due(&run, 1_499));
        assert_eq!(retry_backoff_remaining_ms(&run, 1_000), Some(500));
        assert!(retry_backoff_is_due(&run, 1_500));
        assert_eq!(retry_backoff_remaining_ms(&run, 1_600), Some(0));
    }

    #[test]
    fn retry_backoff_metadata_survives_json_roundtrip() {
        let run = run_with_retry_after(1_500);
        let encoded = serde_json::to_value(&run).expect("serialize");
        let decoded: AutomationV2RunRecord = serde_json::from_value(encoded).expect("deserialize");
        let scheduler = decoded.scheduler.expect("scheduler");

        assert_eq!(scheduler.queue_reason, Some(QueueReason::RetryBackoff));
        assert_eq!(scheduler.retry_node_id.as_deref(), Some("node-a"));
        assert_eq!(scheduler.retry_attempt, Some(2));
        assert_eq!(scheduler.retry_backoff_ms, Some(500));
        assert_eq!(scheduler.retry_after_ms, Some(1_500));
        assert_eq!(scheduler.retry_reason.as_deref(), Some("provider timeout"));
    }

    #[test]
    fn queue_run_for_retry_backoff_records_durable_retry_state() {
        let mut run = run_with_retry_after(100);
        run.status = AutomationRunStatus::Running;
        run.scheduler = None;
        run.active_session_ids.push("session-a".to_string());
        let decision = tandem_automation::RetryDecision {
            version: 1,
            policy_version_id: "policy".to_string(),
            decision: "retry_scheduled".to_string(),
            failure_class: "provider_transient".to_string(),
            reason: "provider timeout".to_string(),
            attempt: 1,
            max_attempts: 3,
            retryable: true,
            terminal: false,
            next_retry_at_ms: Some(1_500),
            backoff_ms: Some(500),
            terminal_behavior: tandem_automation::RetryTerminalBehavior::default(),
            manual_override_allowed: true,
        };
        let expected_epoch = run.execution_claim_epoch;

        let metadata = queue_run_for_retry_backoff(
            &mut run,
            "node-a",
            1,
            3,
            &decision,
            "provider timeout",
            expected_epoch,
            1_000,
        )
        .expect("metadata");

        assert_eq!(run.status, AutomationRunStatus::Queued);
        assert!(run.active_session_ids.is_empty());
        assert_eq!(metadata.retry_after_ms, Some(1_500));
        assert_eq!(metadata.retry_attempt, Some(2));
        assert_eq!(run.scheduler, Some(metadata));
        assert!(run
            .checkpoint
            .lifecycle_history
            .iter()
            .any(|event| event.event == "node_retry_backoff_scheduled"));
        assert_eq!(
            json!(run.scheduler.unwrap().queue_reason),
            json!("retry_backoff")
        );
    }

    #[test]
    fn queue_run_for_retry_backoff_does_not_resurrect_stopped_run() {
        let mut run = run_with_retry_after(100);
        run.status = AutomationRunStatus::Cancelled;
        run.scheduler = None;
        let decision = tandem_automation::RetryDecision {
            version: 1,
            policy_version_id: "policy".to_string(),
            decision: "retry_scheduled".to_string(),
            failure_class: "provider_transient".to_string(),
            reason: "provider timeout".to_string(),
            attempt: 1,
            max_attempts: 3,
            retryable: true,
            terminal: false,
            next_retry_at_ms: Some(1_500),
            backoff_ms: Some(500),
            terminal_behavior: tandem_automation::RetryTerminalBehavior::default(),
            manual_override_allowed: true,
        };
        let expected_epoch = run.execution_claim_epoch;

        assert_eq!(
            queue_run_for_retry_backoff(
                &mut run,
                "node-a",
                1,
                3,
                &decision,
                "provider timeout",
                expected_epoch,
                1_000,
            ),
            None
        );
        assert_eq!(run.status, AutomationRunStatus::Cancelled);
        assert_eq!(run.scheduler, None);
    }
}