tandem-server 0.6.6

HTTP server for Tandem engine APIs
use super::*;

const AUTOMATION_STALE_NODE_TIMEOUT_GRACE_MS: u64 = 60_000;

fn automation_run_effective_stale_after_ms(
    run: &AutomationV2RunRecord,
    default_stale_after_ms: u64,
) -> u64 {
    let Some(automation) = run.automation_snapshot.as_ref() else {
        return default_stale_after_ms;
    };
    let max_node_timeout_ms = automation::lifecycle::automation_in_progress_node_ids(run)
        .iter()
        .filter_map(|node_id| {
            automation
                .flow
                .nodes
                .iter()
                .find(|node| &node.node_id == node_id)
        })
        .map(automation::effective_automation_node_timeout_ms)
        .max()
        .unwrap_or(0);
    default_stale_after_ms
        .max(max_node_timeout_ms.saturating_add(AUTOMATION_STALE_NODE_TIMEOUT_GRACE_MS))
}

impl AppState {
    async fn automation_run_last_activity_at_ms(&self, run: &AutomationV2RunRecord) -> u64 {
        let mut last_activity_at_ms = automation::lifecycle::automation_last_activity_at_ms(run);
        for session_id in &run.active_session_ids {
            if let Some(session) = self.storage.get_session(session_id).await {
                last_activity_at_ms = last_activity_at_ms.max(
                    session
                        .time
                        .updated
                        .timestamp_millis()
                        .max(0)
                        .try_into()
                        .unwrap_or_default(),
                );
            }
            if let Some(active_run) = self.run_registry.get(session_id).await {
                if active_run.run_id == run.run_id {
                    last_activity_at_ms = last_activity_at_ms.max(active_run.last_activity_at_ms);
                }
            }
        }
        last_activity_at_ms
    }

    pub async fn reap_stale_running_automation_runs(&self, stale_after_ms: u64) -> usize {
        let _ = self.reclaim_abandoned_automation_v2_run_leases().await;
        let now = now_ms();
        let candidate_runs = self
            .automation_v2_runs
            .read()
            .await
            .values()
            .filter(|run| run.status == AutomationRunStatus::Running)
            .filter(|run| {
                !automation_v2_run_claims::run_has_unexpired_launch_claim_without_progress(run, now)
            })
            .cloned()
            .collect::<Vec<_>>();
        let mut runs = Vec::new();
        for run in candidate_runs {
            let last_activity_at_ms = self.automation_run_last_activity_at_ms(&run).await;
            let effective_stale_after_ms =
                automation_run_effective_stale_after_ms(&run, stale_after_ms);
            if now.saturating_sub(last_activity_at_ms) >= effective_stale_after_ms {
                runs.push((run, effective_stale_after_ms));
            }
        }
        let mut reaped = 0usize;
        for (run, stale_after_ms) in runs {
            let run_id = run.run_id.clone();
            let session_ids = run.active_session_ids.clone();
            let instance_ids = run.active_instance_ids.clone();
            let stale_node_ids = automation::lifecycle::automation_in_progress_node_ids(&run);
            let detail = format!(
                "automation run paused after no provider activity for at least {}s",
                stale_after_ms / 1000
            );
            for session_id in &session_ids {
                let _ = self.cancellations.cancel(session_id).await;
                let _ = self.run_registry.finish_if_match(session_id, &run_id).await;
            }
            for instance_id in instance_ids {
                let _ = self
                    .agent_teams
                    .cancel_instance(self, &instance_id, "paused by stale-run reaper")
                    .await;
            }
            self.forget_automation_v2_sessions(&session_ids).await;
            let automation_name = run
                .automation_snapshot
                .as_ref()
                .map(|automation| automation.name.clone());
            let mut terminal_stale_node_ids = Vec::new();
            let updated_run = self
                .update_automation_v2_run(&run_id, |row| {
                    let stale_node_detail = format!(
                        "node execution stalled after no provider activity for at least {}s",
                        stale_after_ms / 1000
                    );
                    let automation_snapshot = row.automation_snapshot.clone();
                    let mut annotated_nodes = Vec::new();
                    let mut terminal_nodes = Vec::new();
                    if let Some(automation) = automation_snapshot.as_ref() {
                        for node_id in &stale_node_ids {
                            if row.checkpoint.node_outputs.contains_key(node_id) {
                                continue;
                            }
                            let Some(node) = automation
                                .flow
                                .nodes
                                .iter()
                                .find(|candidate| &candidate.node_id == node_id)
                            else {
                                continue;
                            };
                            let attempts =
                                row.checkpoint.node_attempts.get(node_id).copied().unwrap_or(1);
                            let max_attempts = automation_node_max_attempts(node);
                            let terminal = attempts >= max_attempts;
                            if terminal {
                                terminal_nodes.push(node_id.clone());
                            }
                            row.checkpoint.node_outputs.insert(
                                node_id.clone(),
                                crate::automation_v2::executor::build_node_execution_error_output_with_category(
                                    node,
                                    &stale_node_detail,
                                    terminal,
                                    "stale_no_provider_activity",
                                ),
                            );
                            if row.checkpoint.last_failure.is_none() {
                                row.checkpoint.last_failure = Some(
                                    crate::automation_v2::types::AutomationFailureRecord {
                                        node_id: node_id.clone(),
                                        reason: stale_node_detail.clone(),
                                        failed_at_ms: now_ms(),
                                        failure_kind: Some(
                                            "stale_no_provider_activity".to_string(),
                                        ),
                                        metadata: None,
                                    },
                                );
                            }
                            annotated_nodes.push(node_id.clone());
                        }
                    }
                    terminal_stale_node_ids = terminal_nodes.clone();
                    let terminal = !terminal_nodes.is_empty();
                    row.status = if terminal {
                        AutomationRunStatus::Failed
                    } else {
                        AutomationRunStatus::Paused
                    };
                    row.pause_reason = if terminal {
                        None
                    } else {
                        Some("stale_no_provider_activity".to_string())
                    };
                    row.detail = Some(if terminal {
                        format!(
                            "automation run failed after no provider activity for at least {}s; terminal stale node(s): {}",
                            stale_after_ms / 1000,
                            terminal_nodes.join(", ")
                        )
                    } else if annotated_nodes.is_empty() {
                        detail.clone()
                    } else {
                        format!(
                            "{}; repairable node(s): {}",
                            detail,
                            annotated_nodes.join(", ")
                        )
                    });
                    row.stop_kind = Some(AutomationStopKind::StaleReaped);
                    row.stop_reason = row.detail.clone().or_else(|| Some(detail.clone()));
                    row.active_session_ids.clear();
                    row.latest_session_id = None;
                    row.active_instance_ids.clear();
                    automation::record_automation_lifecycle_event(
                        row,
                        if terminal {
                            "run_failed_stale_no_provider_activity"
                        } else {
                            "run_paused_stale_no_provider_activity"
                        },
                        row.detail.clone().or_else(|| Some(detail.clone())),
                        Some(AutomationStopKind::StaleReaped),
                    );
                    if let Some(automation) = automation_snapshot.as_ref() {
                        automation::refresh_automation_runtime_state(automation, row);
                    }
                })
                .await;
            if let Some(updated_run) = updated_run {
                let terminal = updated_run.status == AutomationRunStatus::Failed;
                if terminal {
                    if let Some(automation) = updated_run.automation_snapshot.as_ref() {
                        crate::automation_v2::executor::publish_automation_v2_failure_event(
                            self,
                            automation,
                            &updated_run,
                        );
                    }
                }
                self.append_internal_sweep_protected_audit_event(
                    if terminal {
                        "automation_v2.internal_sweep.failed_stale_run"
                    } else {
                        "automation_v2.internal_sweep.paused_stale_run"
                    },
                    &updated_run,
                    "reap_stale_running_automation_runs",
                    if terminal { "failed" } else { "paused" },
                    updated_run.detail.clone().or_else(|| Some(detail.clone())),
                    json!({
                        "stale_node_ids": stale_node_ids.clone(),
                        "terminal_stale_node_ids": terminal_stale_node_ids.clone(),
                        "stale_after_ms": stale_after_ms,
                    }),
                )
                .await;
                self.event_bus.publish(EngineEvent::new(
                    if terminal {
                        "automation_v2.run.failed_stale_no_provider_activity"
                    } else {
                        "automation_v2.run.paused_stale_no_provider_activity"
                    },
                    json!({
                        "automation_id": run.automation_id,
                        "automationID": run.automation_id,
                        "workflow_id": run.automation_id,
                        "workflowID": run.automation_id,
                        "workflow_name": automation_name,
                        "run_id": run_id,
                        "runID": run_id,
                        "source": "automation_v2",
                        "component": "automation_v2",
                        "status": if terminal { "failed" } else { "paused" },
                        "pause_reason": if terminal { Value::Null } else { json!("stale_no_provider_activity") },
                        "reason": updated_run.detail.clone().unwrap_or_else(|| detail.clone()),
                        "detail": updated_run.detail.clone().unwrap_or_else(|| detail.clone()),
                        "stale_node_ids": stale_node_ids,
                        "terminal_stale_node_ids": terminal_stale_node_ids,
                        "stale_after_ms": stale_after_ms,
                        "tenantContext": updated_run.tenant_context,
                    }),
                ));
                reaped += 1;
            }
        }
        reaped
    }
}