Skip to main content

tandem_server/
workflows.rs

1use anyhow::Context;
2use serde_json::{json, Value};
3use std::time::Duration;
4use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session, TenantContext};
5use tandem_workflows::{
6    WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
7    WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
8};
9use uuid::Uuid;
10
11use crate::{now_ms, AppState, WorkflowSourceRef};
12
13#[derive(Debug, Clone)]
14struct PreparedWorkflowAction {
15    action_id: String,
16    spec: WorkflowActionSpec,
17}
18
19fn workflow_event_tenant_context_value(tenant_context: &TenantContext) -> Value {
20    serde_json::to_value(tenant_context).unwrap_or_else(|_| json!(tenant_context))
21}
22
23fn workflow_event_with_tenant_context(mut payload: Value, tenant_context: &TenantContext) -> Value {
24    if let Some(map) = payload.as_object_mut() {
25        map.entry("tenantContext".to_string())
26            .or_insert_with(|| workflow_event_tenant_context_value(tenant_context));
27    }
28    payload
29}
30
31fn event_tenant_context(event: &EngineEvent) -> TenantContext {
32    event
33        .properties
34        .get("tenantContext")
35        .and_then(|value| serde_json::from_value(value.clone()).ok())
36        .unwrap_or_else(TenantContext::local_implicit)
37}
38
39fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
40    match with {
41        Some(with) if !with.is_null() => {
42            format!("Execute workflow action `{action}` with payload {with}.")
43        }
44        _ => format!("Execute workflow action `{action}`."),
45    }
46}
47
48fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
49    crate::AutomationV2Schedule {
50        schedule_type: crate::AutomationV2ScheduleType::Manual,
51        cron_expression: None,
52        interval_seconds: None,
53        timezone: "UTC".to_string(),
54        misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
55    }
56}
57
58fn workflow_execution_plan(
59    workflow_id: &str,
60    name: &str,
61    description: Option<String>,
62    actions: &[PreparedWorkflowAction],
63    source_label: &str,
64    source: Option<&WorkflowSourceRef>,
65    trigger_event: Option<&str>,
66) -> crate::WorkflowPlan {
67    crate::WorkflowPlan {
68        plan_id: format!("workflow-plan-{workflow_id}"),
69        planner_version: "workflow_runtime_v1".to_string(),
70        plan_source: source_label.to_string(),
71        original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
72        normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
73        confidence: "high".to_string(),
74        title: name.to_string(),
75        description,
76        schedule: workflow_manual_schedule(),
77        execution_target: "automation_v2".to_string(),
78        workspace_root: std::env::current_dir()
79            .unwrap_or_else(|_| std::path::PathBuf::from("."))
80            .to_string_lossy()
81            .to_string(),
82        steps: actions
83            .iter()
84            .map(|action| crate::WorkflowPlanStep {
85                step_id: action.action_id.clone(),
86                kind: "workflow_action".to_string(),
87                objective: workflow_action_objective(
88                    &action.spec.action,
89                    action.spec.with.as_ref(),
90                ),
91                depends_on: Vec::new(),
92                agent_role: "operator".to_string(),
93                input_refs: Vec::new(),
94                output_contract: Some(crate::AutomationFlowOutputContract {
95                    kind: "generic_artifact".to_string(),
96                    validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
97                    enforcement: None,
98                    schema: None,
99                    summary_guidance: None,
100                }),
101                metadata: None,
102            })
103            .collect(),
104        requires_integrations: Vec::new(),
105        allowed_mcp_servers: Vec::new(),
106        operator_preferences: Some(json!({
107            "source": source_label,
108            "tool_access_mode": "auto",
109        })),
110        save_options: json!({
111            "origin": source_label,
112            "workflow_source": source,
113            "trigger_event": trigger_event,
114        }),
115    }
116}
117
118pub(crate) fn compile_workflow_spec_to_automation_preview(
119    workflow: &WorkflowSpec,
120) -> crate::AutomationV2Spec {
121    let actions = workflow
122        .steps
123        .iter()
124        .map(|step| PreparedWorkflowAction {
125            action_id: step.step_id.clone(),
126            spec: WorkflowActionSpec {
127                action: step.action.clone(),
128                with: step.with.clone(),
129            },
130        })
131        .collect::<Vec<_>>();
132    let mut automation = crate::http::compile_plan_to_automation_v2(
133        &workflow_execution_plan(
134            &workflow.workflow_id,
135            &workflow.name,
136            workflow.description.clone(),
137            &actions,
138            "workflow_registry",
139            workflow.source.as_ref(),
140            None,
141        ),
142        None,
143        "workflow_registry",
144    );
145    if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
146        metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
147        metadata.insert("workflow_name".to_string(), json!(workflow.name));
148        metadata.insert("workflow_source".to_string(), json!(workflow.source));
149        metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
150    }
151    automation
152}
153
154fn compile_workflow_run_automation(
155    workflow_id: &str,
156    workflow_name: Option<&str>,
157    workflow_description: Option<&str>,
158    binding_id: Option<&str>,
159    actions: &[PreparedWorkflowAction],
160    source: Option<&WorkflowSourceRef>,
161    trigger_event: Option<&str>,
162) -> crate::AutomationV2Spec {
163    let automation_id = binding_id
164        .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
165        .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
166    let title = binding_id
167        .map(|binding| {
168            workflow_name
169                .map(|name| format!("{name} hook {binding}"))
170                .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
171        })
172        .unwrap_or_else(|| {
173            workflow_name
174                .map(|name| format!("{name} execution"))
175                .unwrap_or_else(|| format!("Workflow {workflow_id}"))
176        });
177    let mut automation = crate::http::compile_plan_to_automation_v2(
178        &workflow_execution_plan(
179            &automation_id,
180            &title,
181            Some(
182                workflow_description
183                    .map(|description| description.to_string())
184                    .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
185            ),
186            actions,
187            "workflow_runtime",
188            source,
189            trigger_event,
190        ),
191        None,
192        "workflow_runtime",
193    );
194    automation.automation_id = automation_id.clone();
195    automation.name = title;
196    automation.metadata = Some(json!({
197        "workflow_id": workflow_id,
198        "binding_id": binding_id,
199        "workflow_source": source,
200        "trigger_event": trigger_event,
201        "origin": "workflow_runtime_mirror",
202    }));
203    automation
204}
205
206async fn sync_workflow_automation_run_start(
207    state: &AppState,
208    automation: &crate::AutomationV2Spec,
209    run_id: &str,
210) -> anyhow::Result<crate::AutomationV2RunRecord> {
211    let updated = state
212        .update_automation_v2_run(run_id, |run| {
213            run.status = crate::AutomationRunStatus::Running;
214            run.started_at_ms.get_or_insert_with(now_ms);
215            crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
216                run,
217                "workflow_run_started",
218                Some("workflow runtime mirror started".to_string()),
219                None,
220            );
221            crate::app::state::refresh_automation_runtime_state(automation, run);
222        })
223        .await
224        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
225    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
226        .await
227        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
228    Ok(updated)
229}
230
231async fn sync_workflow_automation_action_started(
232    state: &AppState,
233    automation: &crate::AutomationV2Spec,
234    run_id: &str,
235    action_id: &str,
236) -> anyhow::Result<crate::AutomationV2RunRecord> {
237    let updated = state
238        .update_automation_v2_run(run_id, |run| {
239            let next_attempt = run
240                .checkpoint
241                .node_attempts
242                .get(action_id)
243                .copied()
244                .unwrap_or(0)
245                .saturating_add(1);
246            run.checkpoint
247                .node_attempts
248                .insert(action_id.to_string(), next_attempt);
249            run.detail = Some(format!("Running workflow action `{action_id}`"));
250            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
251                run,
252                "workflow_action_started",
253                Some(format!("workflow action `{action_id}` started")),
254                None,
255                Some(json!({
256                    "action_id": action_id,
257                    "attempt": next_attempt,
258                })),
259            );
260            crate::app::state::refresh_automation_runtime_state(automation, run);
261        })
262        .await
263        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
264    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
265        .await
266        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
267    Ok(updated)
268}
269
270async fn sync_workflow_automation_action_completed(
271    state: &AppState,
272    automation: &crate::AutomationV2Spec,
273    run_id: &str,
274    action_id: &str,
275    output: &Value,
276) -> anyhow::Result<crate::AutomationV2RunRecord> {
277    let action_count = automation.flow.nodes.len();
278    let updated = state
279        .update_automation_v2_run(run_id, |run| {
280            run.checkpoint.pending_nodes.retain(|id| id != action_id);
281            if !run
282                .checkpoint
283                .completed_nodes
284                .iter()
285                .any(|id| id == action_id)
286            {
287                run.checkpoint.completed_nodes.push(action_id.to_string());
288            }
289            run.checkpoint.node_outputs.insert(
290                action_id.to_string(),
291                json!(crate::AutomationNodeOutput {
292                    contract_kind: "generic_artifact".to_string(),
293                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
294                    validator_summary: Some(crate::AutomationValidatorSummary {
295                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
296                        outcome: "accepted".to_string(),
297                        reason: Some("workflow action completed".to_string()),
298                        unmet_requirements: Vec::new(),
299                        warning_requirements: Vec::new(),
300                        warning_count: 0,
301                        accepted_candidate_source: Some("workflow_runtime".to_string()),
302                        verification_outcome: Some("not_applicable".to_string()),
303                        validation_basis: None,
304                        repair_attempted: false,
305                        repair_attempt: 0,
306                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
307                            as u32,
308                        repair_succeeded: false,
309                        repair_exhausted: false,
310                    }),
311                    summary: format!("Workflow action `{action_id}` completed"),
312                    content: json!({
313                        "action_id": action_id,
314                        "output": output,
315                    }),
316                    created_at_ms: now_ms(),
317                    node_id: action_id.to_string(),
318                    status: Some("completed".to_string()),
319                    blocked_reason: None,
320                    approved: None,
321                    workflow_class: Some("workflow_action".to_string()),
322                    phase: Some("execution".to_string()),
323                    failure_kind: None,
324                    tool_telemetry: None,
325                    preflight: None,
326                    knowledge_preflight: None,
327                    capability_resolution: None,
328                    attempt_evidence: None,
329                    blocker_category: None,
330                    fallback_used: None,
331                    artifact_validation: None,
332                    receipt_timeline: None,
333                    quality_mode: Some("strict_research_v1".to_string()),
334                    requested_quality_mode: None,
335                    emergency_rollback_enabled: Some(false),
336                    provenance: Some(crate::AutomationNodeOutputProvenance {
337                        session_id: format!("workflow-runtime-{run_id}"),
338                        node_id: action_id.to_string(),
339                        run_id: Some(run_id.to_string()),
340                        output_path: None,
341                        content_digest: None,
342                        accepted_candidate_source: Some("workflow_runtime".to_string()),
343                        validation_outcome: Some("not_applicable".to_string()),
344                        repair_attempt: Some(0),
345                        repair_succeeded: Some(false),
346                        reuse_allowed: Some(false),
347                        freshness: crate::AutomationNodeOutputFreshness {
348                            current_run: true,
349                            current_attempt: true,
350                        },
351                    }),
352                }),
353            );
354            if run.checkpoint.completed_nodes.len() >= action_count {
355                run.status = crate::AutomationRunStatus::Completed;
356                run.detail = Some("workflow runtime mirror completed".to_string());
357            }
358            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
359                run,
360                "workflow_action_completed",
361                Some(format!("workflow action `{action_id}` completed")),
362                None,
363                Some(json!({
364                    "action_id": action_id,
365                })),
366            );
367            crate::app::state::refresh_automation_runtime_state(automation, run);
368        })
369        .await
370        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
371    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
372        .await
373        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
374    Ok(updated)
375}
376
377async fn sync_workflow_automation_action_failed(
378    state: &AppState,
379    automation: &crate::AutomationV2Spec,
380    run_id: &str,
381    action_id: &str,
382    error: &str,
383) -> anyhow::Result<crate::AutomationV2RunRecord> {
384    let updated = state
385        .update_automation_v2_run(run_id, |run| {
386            run.status = crate::AutomationRunStatus::Failed;
387            run.detail = Some(format!("Workflow action `{action_id}` failed"));
388            run.checkpoint.pending_nodes.retain(|id| id != action_id);
389            run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
390                node_id: action_id.to_string(),
391                reason: error.to_string(),
392                failed_at_ms: now_ms(),
393            });
394            run.checkpoint.node_outputs.insert(
395                action_id.to_string(),
396                json!(crate::AutomationNodeOutput {
397                    contract_kind: "generic_artifact".to_string(),
398                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
399                    validator_summary: Some(crate::AutomationValidatorSummary {
400                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
401                        outcome: "rejected".to_string(),
402                        reason: Some(error.to_string()),
403                        unmet_requirements: vec![error.to_string()],
404                        warning_requirements: Vec::new(),
405                        warning_count: 0,
406                        accepted_candidate_source: None,
407                        verification_outcome: Some("failed".to_string()),
408                        validation_basis: None,
409                        repair_attempted: false,
410                        repair_attempt: 0,
411                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
412                            as u32,
413                        repair_succeeded: false,
414                        repair_exhausted: false,
415                    }),
416                    summary: format!("Workflow action `{action_id}` failed"),
417                    content: json!({
418                        "action_id": action_id,
419                        "error": error,
420                    }),
421                    created_at_ms: now_ms(),
422                    node_id: action_id.to_string(),
423                    status: Some("failed".to_string()),
424                    blocked_reason: Some(error.to_string()),
425                    approved: None,
426                    workflow_class: Some("workflow_action".to_string()),
427                    phase: Some("execution".to_string()),
428                    failure_kind: Some("workflow_action_failed".to_string()),
429                    tool_telemetry: None,
430                    preflight: None,
431                    knowledge_preflight: None,
432                    capability_resolution: None,
433                    attempt_evidence: None,
434                    blocker_category: Some("tool_result_unusable".to_string()),
435                    fallback_used: None,
436                    artifact_validation: None,
437                    receipt_timeline: None,
438                    quality_mode: Some("strict_research_v1".to_string()),
439                    requested_quality_mode: None,
440                    emergency_rollback_enabled: Some(false),
441                    provenance: Some(crate::AutomationNodeOutputProvenance {
442                        session_id: format!("workflow-runtime-{run_id}"),
443                        node_id: action_id.to_string(),
444                        run_id: Some(run_id.to_string()),
445                        output_path: None,
446                        content_digest: None,
447                        accepted_candidate_source: None,
448                        validation_outcome: Some("failed".to_string()),
449                        repair_attempt: Some(0),
450                        repair_succeeded: Some(false),
451                        reuse_allowed: Some(false),
452                        freshness: crate::AutomationNodeOutputFreshness {
453                            current_run: true,
454                            current_attempt: true,
455                        },
456                    }),
457                }),
458            );
459            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
460                run,
461                "workflow_action_failed",
462                Some(format!("workflow action `{action_id}` failed")),
463                None,
464                Some(json!({
465                    "action_id": action_id,
466                    "error": error,
467                })),
468            );
469            crate::app::state::refresh_automation_runtime_state(automation, run);
470        })
471        .await
472        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
473    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
474        .await
475        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
476    Ok(updated)
477}
478
479#[derive(Debug, Clone)]
480pub enum ParsedWorkflowAction {
481    EventEmit { event_type: String },
482    ResourcePut { key: String },
483    ResourcePatch { key: String },
484    ResourceDelete { key: String },
485    Tool { tool_name: String },
486    Capability { capability_id: String },
487    Workflow { workflow_id: String },
488    Agent { agent_id: String },
489}
490
491pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
492    let trimmed = action.trim();
493    if let Some(rest) = trimmed.strip_prefix("event:") {
494        return ParsedWorkflowAction::EventEmit {
495            event_type: rest.trim().to_string(),
496        };
497    }
498    if let Some(rest) = trimmed.strip_prefix("resource:put:") {
499        return ParsedWorkflowAction::ResourcePut {
500            key: rest.trim().to_string(),
501        };
502    }
503    if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
504        return ParsedWorkflowAction::ResourcePatch {
505            key: rest.trim().to_string(),
506        };
507    }
508    if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
509        return ParsedWorkflowAction::ResourceDelete {
510            key: rest.trim().to_string(),
511        };
512    }
513    if let Some(rest) = trimmed.strip_prefix("tool:") {
514        return ParsedWorkflowAction::Tool {
515            tool_name: rest.trim().to_string(),
516        };
517    }
518    if let Some(rest) = trimmed.strip_prefix("capability:") {
519        return ParsedWorkflowAction::Capability {
520            capability_id: rest.trim().to_string(),
521        };
522    }
523    if let Some(rest) = trimmed.strip_prefix("workflow:") {
524        return ParsedWorkflowAction::Workflow {
525            workflow_id: rest.trim().to_string(),
526        };
527    }
528    if let Some(rest) = trimmed.strip_prefix("agent:") {
529        return ParsedWorkflowAction::Agent {
530            agent_id: rest.trim().to_string(),
531        };
532    }
533    ParsedWorkflowAction::Capability {
534        capability_id: trimmed.to_string(),
535    }
536}
537
538pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
539    let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
540    match event.event_type.as_str() {
541        "context.task.created" => names.push("task_created".to_string()),
542        "context.task.started" => names.push("task_started".to_string()),
543        "context.task.completed" => names.push("task_completed".to_string()),
544        "context.task.failed" => names.push("task_failed".to_string()),
545        "workflow.run.started" | "routine.run.created" => {
546            names.push("workflow_started".to_string())
547        }
548        "workflow.run.completed" | "routine.run.completed" => {
549            names.push("workflow_completed".to_string())
550        }
551        "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
552        _ => {}
553    }
554    names.sort();
555    names.dedup();
556    names
557}
558
559pub async fn simulate_workflow_event(
560    state: &AppState,
561    event: &EngineEvent,
562) -> WorkflowSimulationResult {
563    let registry = state.workflow_registry().await;
564    let canonical = canonical_workflow_event_names(event);
565    let matched_bindings = registry
566        .hooks
567        .into_iter()
568        .filter(|hook| {
569            hook.enabled
570                && canonical
571                    .iter()
572                    .any(|name| event_name_matches(&hook.event, name))
573        })
574        .collect::<Vec<_>>();
575    let planned_actions = matched_bindings
576        .iter()
577        .flat_map(|hook| hook.actions.clone())
578        .collect::<Vec<_>>();
579    WorkflowSimulationResult {
580        matched_bindings,
581        planned_actions,
582        canonical_events: canonical,
583    }
584}
585
586pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
587    let simulation = simulate_workflow_event(state, event).await;
588    if simulation.matched_bindings.is_empty() {
589        return;
590    }
591    for hook in simulation.matched_bindings {
592        let source_event_id = source_event_id(event);
593        let task_id = task_id_from_event(event);
594        let tenant_context = event_tenant_context(event);
595        let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
596        {
597            let mut seen = state.workflow_dispatch_seen.write().await;
598            if seen.contains_key(&dedupe_key) {
599                continue;
600            }
601            seen.insert(dedupe_key, now_ms());
602        }
603        let _ = execute_hook_binding(
604            state,
605            &hook,
606            tenant_context.clone(),
607            Some(event.event_type.clone()),
608            Some(source_event_id),
609            task_id,
610            false,
611        )
612        .await;
613    }
614}
615
616pub async fn run_workflow_dispatcher(state: AppState) {
617    if !state.wait_until_ready_or_failed(120, 250).await {
618        let startup = state.startup_snapshot().await;
619        tracing::warn!(
620            component = "workflow_dispatcher",
621            startup_status = ?startup.status,
622            startup_phase = %startup.phase,
623            attempt_id = %startup.attempt_id,
624            "workflow dispatcher exiting before runtime access because startup did not become ready"
625        );
626        return;
627    }
628    let mut rx = state.event_bus.subscribe();
629    loop {
630        match rx.recv().await {
631            Ok(event) => dispatch_workflow_event(&state, &event).await,
632            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
633            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
634        }
635    }
636}
637
638pub async fn execute_workflow(
639    state: &AppState,
640    workflow: &WorkflowSpec,
641    tenant_context: TenantContext,
642    trigger_event: Option<String>,
643    source_event_id: Option<String>,
644    task_id: Option<String>,
645    dry_run: bool,
646) -> anyhow::Result<WorkflowRunRecord> {
647    let actions = workflow
648        .steps
649        .iter()
650        .map(|step| PreparedWorkflowAction {
651            action_id: step.step_id.clone(),
652            spec: WorkflowActionSpec {
653                action: step.action.clone(),
654                with: step.with.clone(),
655            },
656        })
657        .collect::<Vec<_>>();
658    execute_actions(
659        state,
660        &workflow.workflow_id,
661        None,
662        actions,
663        Some(workflow.name.clone()),
664        workflow.description.clone(),
665        workflow.source.clone(),
666        tenant_context,
667        trigger_event,
668        source_event_id,
669        task_id,
670        dry_run,
671    )
672    .await
673}
674
675pub async fn execute_hook_binding(
676    state: &AppState,
677    hook: &WorkflowHookBinding,
678    tenant_context: TenantContext,
679    trigger_event: Option<String>,
680    source_event_id: Option<String>,
681    task_id: Option<String>,
682    dry_run: bool,
683) -> anyhow::Result<WorkflowRunRecord> {
684    let workflow = state
685        .get_workflow(&hook.workflow_id)
686        .await
687        .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
688    execute_actions(
689        state,
690        &hook.workflow_id,
691        Some(hook.binding_id.clone()),
692        hook.actions
693            .iter()
694            .enumerate()
695            .map(|(idx, action)| PreparedWorkflowAction {
696                action_id: format!("action_{}", idx + 1),
697                spec: action.clone(),
698            })
699            .collect(),
700        None,
701        None,
702        workflow.source,
703        tenant_context,
704        trigger_event,
705        source_event_id,
706        task_id,
707        dry_run,
708    )
709    .await
710}
711
712async fn execute_actions(
713    state: &AppState,
714    workflow_id: &str,
715    binding_id: Option<String>,
716    actions: Vec<PreparedWorkflowAction>,
717    workflow_name: Option<String>,
718    workflow_description: Option<String>,
719    source: Option<WorkflowSourceRef>,
720    tenant_context: TenantContext,
721    trigger_event: Option<String>,
722    source_event_id: Option<String>,
723    task_id: Option<String>,
724    dry_run: bool,
725) -> anyhow::Result<WorkflowRunRecord> {
726    let run_id = format!("workflow-run-{}", Uuid::new_v4());
727    let now = now_ms();
728    let automation = compile_workflow_run_automation(
729        workflow_id,
730        workflow_name.as_deref(),
731        workflow_description.as_deref(),
732        binding_id.as_deref(),
733        &actions,
734        source.as_ref(),
735        trigger_event.as_deref(),
736    );
737    let automation = state.put_automation_v2(automation).await?;
738    let automation_run = state
739        .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
740        .await?;
741    let automation_run =
742        sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
743    let mut run = WorkflowRunRecord {
744        run_id: run_id.clone(),
745        workflow_id: workflow_id.to_string(),
746        tenant_context: tenant_context.clone(),
747        automation_id: Some(automation.automation_id.clone()),
748        automation_run_id: Some(automation_run.run_id.clone()),
749        binding_id,
750        trigger_event: trigger_event.clone(),
751        source_event_id: source_event_id.clone(),
752        task_id: task_id.clone(),
753        status: if dry_run {
754            WorkflowRunStatus::DryRun
755        } else {
756            WorkflowRunStatus::Running
757        },
758        created_at_ms: now,
759        updated_at_ms: now,
760        finished_at_ms: if dry_run { Some(now) } else { None },
761        actions: actions
762            .iter()
763            .map(|action| WorkflowActionRunRecord {
764                action_id: action.action_id.clone(),
765                action: action.spec.action.clone(),
766                task_id: task_id.clone(),
767                status: if dry_run {
768                    WorkflowActionRunStatus::Skipped
769                } else {
770                    WorkflowActionRunStatus::Pending
771                },
772                detail: None,
773                output: None,
774                updated_at_ms: now,
775            })
776            .collect(),
777        source,
778    };
779    state.put_workflow_run(run.clone()).await?;
780    let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
781    state.event_bus.publish(EngineEvent::new(
782        "workflow.run.started",
783        workflow_event_with_tenant_context(
784            json!({
785            "runID": run.run_id,
786            "workflowID": run.workflow_id,
787            "bindingID": run.binding_id,
788            "triggerEvent": trigger_event,
789            "sourceEventID": source_event_id,
790            "taskID": task_id,
791            "dryRun": dry_run,
792            }),
793            &run.tenant_context,
794        ),
795    ));
796    if dry_run {
797        return Ok(run);
798    }
799    for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
800        action_row.status = WorkflowActionRunStatus::Running;
801        action_row.updated_at_ms = now_ms();
802        let action_name = action_row.action.clone();
803        let _ = sync_workflow_automation_action_started(
804            state,
805            &automation,
806            automation_run.run_id.as_str(),
807            &action_row.action_id,
808        )
809        .await;
810        state
811            .update_workflow_run(&run.run_id, |row| {
812                if let Some(target) = row
813                    .actions
814                    .iter_mut()
815                    .find(|item| item.action_id == action_row.action_id)
816                {
817                    *target = action_row.clone();
818                }
819            })
820            .await;
821        if let Some(latest) = state.get_workflow_run(&run.run_id).await {
822            let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
823        }
824        state.event_bus.publish(EngineEvent::new(
825            "workflow.action.started",
826            workflow_event_with_tenant_context(
827                json!({
828                "runID": run.run_id,
829                "workflowID": run.workflow_id,
830                "actionID": action_row.action_id,
831                "action": action_name,
832                "taskID": run.task_id,
833                }),
834                &run.tenant_context,
835            ),
836        ));
837        match execute_action(
838            state,
839            &run.run_id,
840            workflow_id,
841            &action_spec.spec,
842            action_row,
843            &run.tenant_context,
844            trigger_event.clone(),
845        )
846        .await
847        {
848            Ok(output) => {
849                action_row.status = WorkflowActionRunStatus::Completed;
850                action_row.output = Some(output.clone());
851                action_row.updated_at_ms = now_ms();
852                state
853                    .update_workflow_run(&run.run_id, |row| {
854                        if let Some(target) = row
855                            .actions
856                            .iter_mut()
857                            .find(|item| item.action_id == action_row.action_id)
858                        {
859                            *target = action_row.clone();
860                        }
861                    })
862                    .await;
863                let _ = sync_workflow_automation_action_completed(
864                    state,
865                    &automation,
866                    automation_run.run_id.as_str(),
867                    &action_row.action_id,
868                    &output,
869                )
870                .await;
871                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
872                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
873                }
874                state.event_bus.publish(EngineEvent::new(
875                    "workflow.action.completed",
876                    workflow_event_with_tenant_context(
877                        json!({
878                        "runID": run.run_id,
879                        "workflowID": run.workflow_id,
880                        "actionID": action_row.action_id,
881                        "action": action_name,
882                        "taskID": run.task_id,
883                        "output": output,
884                        }),
885                        &run.tenant_context,
886                    ),
887                ));
888            }
889            Err(error) => {
890                let detail = error.to_string();
891                action_row.status = WorkflowActionRunStatus::Failed;
892                action_row.detail = Some(detail.clone());
893                action_row.updated_at_ms = now_ms();
894                run.status = WorkflowRunStatus::Failed;
895                run.finished_at_ms = Some(now_ms());
896                state
897                    .update_workflow_run(&run.run_id, |row| {
898                        row.status = WorkflowRunStatus::Failed;
899                        row.finished_at_ms = Some(now_ms());
900                        if let Some(target) = row
901                            .actions
902                            .iter_mut()
903                            .find(|item| item.action_id == action_row.action_id)
904                        {
905                            *target = action_row.clone();
906                        }
907                    })
908                    .await;
909                let _ = sync_workflow_automation_action_failed(
910                    state,
911                    &automation,
912                    automation_run.run_id.as_str(),
913                    &action_row.action_id,
914                    &detail,
915                )
916                .await;
917                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
918                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
919                }
920                state.event_bus.publish(EngineEvent::new(
921                    "workflow.action.failed",
922                    workflow_event_with_tenant_context(
923                        json!({
924                        "runID": run.run_id,
925                        "workflowID": run.workflow_id,
926                        "actionID": action_row.action_id,
927                        "action": action_name,
928                        "taskID": run.task_id,
929                        "error": detail,
930                        }),
931                        &run.tenant_context,
932                    ),
933                ));
934                state.event_bus.publish(EngineEvent::new(
935                    "workflow.run.failed",
936                    workflow_event_with_tenant_context(
937                        json!({
938                        "runID": run.run_id,
939                        "workflowID": run.workflow_id,
940                        "actionID": action_row.action_id,
941                        "taskID": run.task_id,
942                        "error": action_row.detail,
943                        }),
944                        &run.tenant_context,
945                    ),
946                ));
947                return state.get_workflow_run(&run.run_id).await.with_context(|| {
948                    format!("workflow run `{}` missing after failure", run.run_id)
949                });
950            }
951        }
952    }
953    run.status = WorkflowRunStatus::Completed;
954    run.finished_at_ms = Some(now_ms());
955    let final_run = state
956        .update_workflow_run(&run.run_id, |row| {
957            row.status = WorkflowRunStatus::Completed;
958            row.finished_at_ms = Some(now_ms());
959        })
960        .await
961        .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
962    let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
963    state.event_bus.publish(EngineEvent::new(
964        "workflow.run.completed",
965        workflow_event_with_tenant_context(
966            json!({
967            "runID": final_run.run_id,
968            "workflowID": final_run.workflow_id,
969            "bindingID": final_run.binding_id,
970            "taskID": final_run.task_id,
971            }),
972            &final_run.tenant_context,
973        ),
974    ));
975    Ok(final_run)
976}
977
978async fn execute_action(
979    state: &AppState,
980    run_id: &str,
981    workflow_id: &str,
982    action_spec: &WorkflowActionSpec,
983    action_row: &WorkflowActionRunRecord,
984    tenant_context: &TenantContext,
985    trigger_event: Option<String>,
986) -> anyhow::Result<Value> {
987    let action_name = action_spec.action.as_str();
988    let parsed = parse_workflow_action(action_name);
989    match parsed {
990        ParsedWorkflowAction::EventEmit { event_type } => {
991            let payload = action_payload(action_spec, action_row);
992            state.event_bus.publish(EngineEvent::new(
993                event_type.clone(),
994                workflow_event_with_tenant_context(
995                    json!({
996                    "workflowID": workflow_id,
997                    "actionID": action_row.action_id,
998                    "triggerEvent": trigger_event,
999                    "payload": payload,
1000                    }),
1001                    tenant_context,
1002                ),
1003            ));
1004            Ok(json!({ "eventType": event_type }))
1005        }
1006        ParsedWorkflowAction::ResourcePut { key } => {
1007            let record = state
1008                .put_shared_resource(
1009                    key.clone(),
1010                    action_payload(action_spec, action_row),
1011                    None,
1012                    "workflow".to_string(),
1013                    None,
1014                )
1015                .await
1016                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1017            Ok(json!({ "key": record.key, "rev": record.rev }))
1018        }
1019        ParsedWorkflowAction::ResourcePatch { key } => {
1020            let current = state.get_shared_resource(&key).await;
1021            let next_rev = current.as_ref().map(|row| row.rev);
1022            let record = state
1023                .put_shared_resource(
1024                    key.clone(),
1025                    merge_object(
1026                        current.map(|row| row.value).unwrap_or_else(|| json!({})),
1027                        action_payload(action_spec, action_row),
1028                    ),
1029                    next_rev,
1030                    "workflow".to_string(),
1031                    None,
1032                )
1033                .await
1034                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1035            Ok(json!({ "key": record.key, "rev": record.rev }))
1036        }
1037        ParsedWorkflowAction::ResourceDelete { key } => {
1038            let deleted = state
1039                .delete_shared_resource(&key, None)
1040                .await
1041                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1042            Ok(json!({ "key": key, "deleted": deleted.is_some() }))
1043        }
1044        ParsedWorkflowAction::Tool { tool_name } => {
1045            let payload = action_payload(action_spec, action_row);
1046            let result = state.tools.execute(&tool_name, payload.clone()).await?;
1047            let mut response = json!({
1048                "tool": tool_name,
1049                "output": result.output,
1050                "metadata": result.metadata,
1051            });
1052            if let Some(external_action) = record_workflow_external_action(
1053                state,
1054                run_id,
1055                workflow_id,
1056                action_row,
1057                trigger_event.clone(),
1058                WorkflowExternalActionExecution::Tool {
1059                    tool_name: tool_name.clone(),
1060                },
1061                &payload,
1062                &response,
1063            )
1064            .await?
1065            {
1066                if let Some(obj) = response.as_object_mut() {
1067                    obj.insert("external_action".to_string(), external_action);
1068                }
1069            }
1070            Ok(response)
1071        }
1072        ParsedWorkflowAction::Capability { capability_id } => {
1073            let bindings = state.capability_resolver.list_bindings().await?;
1074            let tool_name = bindings
1075                .bindings
1076                .iter()
1077                .find(|binding| binding.capability_id == capability_id)
1078                .map(|binding| binding.tool_name.clone())
1079                .unwrap_or_else(|| capability_id.clone());
1080            let payload = action_payload(action_spec, action_row);
1081            let result = state.tools.execute(&tool_name, payload.clone()).await?;
1082            let mut response = json!({
1083                "capability": capability_id,
1084                "tool": tool_name,
1085                "output": result.output,
1086                "metadata": result.metadata,
1087            });
1088            if let Some(external_action) = record_workflow_external_action(
1089                state,
1090                run_id,
1091                workflow_id,
1092                action_row,
1093                trigger_event.clone(),
1094                WorkflowExternalActionExecution::Capability {
1095                    capability_id: capability_id.clone(),
1096                    tool_name: tool_name.clone(),
1097                },
1098                &payload,
1099                &response,
1100            )
1101            .await?
1102            {
1103                if let Some(obj) = response.as_object_mut() {
1104                    obj.insert("external_action".to_string(), external_action);
1105                }
1106            }
1107            Ok(response)
1108        }
1109        ParsedWorkflowAction::Workflow { workflow_id } => {
1110            anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1111        }
1112        ParsedWorkflowAction::Agent { agent_id } => {
1113            let workspace_root = state.workspace_index.snapshot().await.root;
1114            let session = Session::new(
1115                Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1116                Some(workspace_root.clone()),
1117            );
1118            let session_id = session.id.clone();
1119            state.storage.save_session(session).await?;
1120            let prompt = action_spec
1121                .with
1122                .as_ref()
1123                .and_then(|v| v.get("prompt"))
1124                .and_then(|v| v.as_str())
1125                .map(ToString::to_string)
1126                .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1127            let request = SendMessageRequest {
1128                parts: vec![MessagePartInput::Text { text: prompt }],
1129                model: None,
1130                agent: Some(agent_id.clone()),
1131                tool_mode: None,
1132                tool_allowlist: None,
1133                strict_kb_grounding: None,
1134                context_mode: None,
1135                write_required: None,
1136                prewrite_requirements: None,
1137            };
1138            state
1139                .engine_loop
1140                .run_prompt_async_with_context(
1141                    session_id.clone(),
1142                    request,
1143                    Some(format!("workflow:{workflow_id}")),
1144                )
1145                .await?;
1146            Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1147        }
1148    }
1149}
1150
1151enum WorkflowExternalActionExecution {
1152    Tool {
1153        tool_name: String,
1154    },
1155    Capability {
1156        capability_id: String,
1157        tool_name: String,
1158    },
1159}
1160
1161async fn record_workflow_external_action(
1162    state: &AppState,
1163    run_id: &str,
1164    workflow_id: &str,
1165    action_row: &WorkflowActionRunRecord,
1166    trigger_event: Option<String>,
1167    execution: WorkflowExternalActionExecution,
1168    payload: &Value,
1169    result: &Value,
1170) -> anyhow::Result<Option<Value>> {
1171    let bindings = state.capability_resolver.list_bindings().await?;
1172    let binding = match execution {
1173        WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1174            .bindings
1175            .iter()
1176            .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1177        WorkflowExternalActionExecution::Capability {
1178            ref capability_id,
1179            ref tool_name,
1180        } => bindings.bindings.iter().find(|binding| {
1181            binding.capability_id == *capability_id
1182                && workflow_binding_matches_tool_name(binding, tool_name)
1183        }),
1184    };
1185    let Some(binding) = binding else {
1186        return Ok(None);
1187    };
1188
1189    let target = workflow_external_action_target(payload, result);
1190    let source_id = format!("{run_id}:{}", action_row.action_id);
1191    let idempotency_key = crate::sha256_hex(&[
1192        workflow_id,
1193        run_id,
1194        &action_row.action_id,
1195        &action_row.action,
1196        &payload.to_string(),
1197    ]);
1198    let action = crate::ExternalActionRecord {
1199        action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1200        operation: binding.capability_id.clone(),
1201        status: "posted".to_string(),
1202        source_kind: Some("workflow".to_string()),
1203        source_id: Some(source_id.clone()),
1204        routine_run_id: None,
1205        context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1206        capability_id: Some(binding.capability_id.clone()),
1207        provider: Some(binding.provider.clone()),
1208        target,
1209        approval_state: Some("executed".to_string()),
1210        idempotency_key: Some(idempotency_key),
1211        receipt: Some(result.clone()),
1212        error: None,
1213        metadata: Some(json!({
1214            "workflowID": workflow_id,
1215            "workflowRunID": run_id,
1216            "actionID": action_row.action_id,
1217            "action": action_row.action,
1218            "taskID": action_row.task_id,
1219            "triggerEvent": trigger_event,
1220            "tool": binding.tool_name,
1221            "provider": binding.provider,
1222            "input": payload,
1223        })),
1224        created_at_ms: action_row.updated_at_ms,
1225        updated_at_ms: action_row.updated_at_ms,
1226    };
1227    let recorded = state.record_external_action(action).await?;
1228    Ok(Some(serde_json::to_value(&recorded)?))
1229}
1230
1231fn workflow_binding_matches_tool_name(
1232    binding: &crate::capability_resolver::CapabilityBinding,
1233    tool_name: &str,
1234) -> bool {
1235    binding.tool_name.eq_ignore_ascii_case(tool_name)
1236        || binding
1237            .tool_name_aliases
1238            .iter()
1239            .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1240}
1241
1242fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1243    for candidate in [
1244        payload.pointer("/owner_repo").and_then(Value::as_str),
1245        payload.pointer("/repo").and_then(Value::as_str),
1246        payload.pointer("/repository").and_then(Value::as_str),
1247        payload.pointer("/channel").and_then(Value::as_str),
1248        payload.pointer("/channel_id").and_then(Value::as_str),
1249        payload.pointer("/thread_ts").and_then(Value::as_str),
1250        result.pointer("/metadata/channel").and_then(Value::as_str),
1251        result.pointer("/metadata/repo").and_then(Value::as_str),
1252    ] {
1253        let trimmed = candidate.map(str::trim).unwrap_or_default();
1254        if !trimmed.is_empty() {
1255            return Some(trimmed.to_string());
1256        }
1257    }
1258    None
1259}
1260
1261fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1262    action_spec
1263        .with
1264        .clone()
1265        .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1266}
1267
1268fn merge_object(current: Value, patch: Value) -> Value {
1269    if let (Some(mut current_obj), Some(patch_obj)) =
1270        (current.as_object().cloned(), patch.as_object())
1271    {
1272        for (key, value) in patch_obj {
1273            current_obj.insert(key.clone(), value.clone());
1274        }
1275        Value::Object(current_obj)
1276    } else {
1277        patch
1278    }
1279}
1280
1281fn source_event_id(event: &EngineEvent) -> String {
1282    if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1283        return id.to_string();
1284    }
1285    for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1286        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1287            return format!("{}:{id}", event.event_type);
1288        }
1289    }
1290    format!("{}:{}", event.event_type, event.properties)
1291}
1292
1293fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1294    for key in ["task_id", "taskID", "step_id", "stepID"] {
1295        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1296            let trimmed = id.trim();
1297            if !trimmed.is_empty() {
1298                return Some(trimmed.to_string());
1299            }
1300        }
1301    }
1302    None
1303}
1304
1305fn event_name_matches(expected: &str, actual: &str) -> bool {
1306    expected.trim().eq_ignore_ascii_case(actual.trim())
1307}