Skip to main content

tandem_server/
workflows.rs

1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session, TenantContext};
4use tandem_workflows::{
5    WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6    WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13struct PreparedWorkflowAction {
14    action_id: String,
15    spec: WorkflowActionSpec,
16}
17
18fn workflow_event_tenant_context_value(tenant_context: &TenantContext) -> Value {
19    serde_json::to_value(tenant_context).unwrap_or_else(|_| json!(tenant_context))
20}
21
22fn workflow_event_with_tenant_context(mut payload: Value, tenant_context: &TenantContext) -> Value {
23    if let Some(map) = payload.as_object_mut() {
24        map.entry("tenantContext".to_string())
25            .or_insert_with(|| workflow_event_tenant_context_value(tenant_context));
26    }
27    payload
28}
29
30fn event_tenant_context(event: &EngineEvent) -> TenantContext {
31    event
32        .properties
33        .get("tenantContext")
34        .and_then(|value| serde_json::from_value(value.clone()).ok())
35        .unwrap_or_else(TenantContext::local_implicit)
36}
37
38fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
39    match with {
40        Some(with) if !with.is_null() => {
41            format!("Execute workflow action `{action}` with payload {with}.")
42        }
43        _ => format!("Execute workflow action `{action}`."),
44    }
45}
46
47fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
48    crate::AutomationV2Schedule {
49        schedule_type: crate::AutomationV2ScheduleType::Manual,
50        cron_expression: None,
51        interval_seconds: None,
52        timezone: "UTC".to_string(),
53        misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
54    }
55}
56
57fn workflow_execution_plan(
58    workflow_id: &str,
59    name: &str,
60    description: Option<String>,
61    actions: &[PreparedWorkflowAction],
62    source_label: &str,
63    source: Option<&WorkflowSourceRef>,
64    trigger_event: Option<&str>,
65) -> crate::WorkflowPlan {
66    crate::WorkflowPlan {
67        plan_id: format!("workflow-plan-{workflow_id}"),
68        planner_version: "workflow_runtime_v1".to_string(),
69        plan_source: source_label.to_string(),
70        original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
71        normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
72        confidence: "high".to_string(),
73        title: name.to_string(),
74        description,
75        schedule: workflow_manual_schedule(),
76        execution_target: "automation_v2".to_string(),
77        workspace_root: std::env::current_dir()
78            .unwrap_or_else(|_| std::path::PathBuf::from("."))
79            .to_string_lossy()
80            .to_string(),
81        steps: actions
82            .iter()
83            .map(|action| crate::WorkflowPlanStep {
84                step_id: action.action_id.clone(),
85                kind: "workflow_action".to_string(),
86                objective: workflow_action_objective(
87                    &action.spec.action,
88                    action.spec.with.as_ref(),
89                ),
90                depends_on: Vec::new(),
91                agent_role: "operator".to_string(),
92                input_refs: Vec::new(),
93                output_contract: Some(crate::AutomationFlowOutputContract {
94                    kind: "generic_artifact".to_string(),
95                    validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
96                    enforcement: None,
97                    schema: None,
98                    summary_guidance: None,
99                }),
100                metadata: None,
101            })
102            .collect(),
103        requires_integrations: Vec::new(),
104        allowed_mcp_servers: Vec::new(),
105        operator_preferences: Some(json!({
106            "source": source_label,
107            "tool_access_mode": "auto",
108        })),
109        save_options: json!({
110            "origin": source_label,
111            "workflow_source": source,
112            "trigger_event": trigger_event,
113        }),
114    }
115}
116
117pub(crate) fn compile_workflow_spec_to_automation_preview(
118    workflow: &WorkflowSpec,
119) -> crate::AutomationV2Spec {
120    let actions = workflow
121        .steps
122        .iter()
123        .map(|step| PreparedWorkflowAction {
124            action_id: step.step_id.clone(),
125            spec: WorkflowActionSpec {
126                action: step.action.clone(),
127                with: step.with.clone(),
128            },
129        })
130        .collect::<Vec<_>>();
131    let mut automation = crate::http::compile_plan_to_automation_v2(
132        &workflow_execution_plan(
133            &workflow.workflow_id,
134            &workflow.name,
135            workflow.description.clone(),
136            &actions,
137            "workflow_registry",
138            workflow.source.as_ref(),
139            None,
140        ),
141        None,
142        "workflow_registry",
143    );
144    if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
145        metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
146        metadata.insert("workflow_name".to_string(), json!(workflow.name));
147        metadata.insert("workflow_source".to_string(), json!(workflow.source));
148        metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
149    }
150    automation
151}
152
153fn compile_workflow_run_automation(
154    workflow_id: &str,
155    workflow_name: Option<&str>,
156    workflow_description: Option<&str>,
157    binding_id: Option<&str>,
158    actions: &[PreparedWorkflowAction],
159    source: Option<&WorkflowSourceRef>,
160    trigger_event: Option<&str>,
161) -> crate::AutomationV2Spec {
162    let automation_id = binding_id
163        .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
164        .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
165    let title = binding_id
166        .map(|binding| {
167            workflow_name
168                .map(|name| format!("{name} hook {binding}"))
169                .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
170        })
171        .unwrap_or_else(|| {
172            workflow_name
173                .map(|name| format!("{name} execution"))
174                .unwrap_or_else(|| format!("Workflow {workflow_id}"))
175        });
176    let mut automation = crate::http::compile_plan_to_automation_v2(
177        &workflow_execution_plan(
178            &automation_id,
179            &title,
180            Some(
181                workflow_description
182                    .map(|description| description.to_string())
183                    .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
184            ),
185            actions,
186            "workflow_runtime",
187            source,
188            trigger_event,
189        ),
190        None,
191        "workflow_runtime",
192    );
193    automation.automation_id = automation_id.clone();
194    automation.name = title;
195    automation.metadata = Some(json!({
196        "workflow_id": workflow_id,
197        "binding_id": binding_id,
198        "workflow_source": source,
199        "trigger_event": trigger_event,
200        "origin": "workflow_runtime_mirror",
201    }));
202    automation
203}
204
205async fn sync_workflow_automation_run_start(
206    state: &AppState,
207    automation: &crate::AutomationV2Spec,
208    run_id: &str,
209) -> anyhow::Result<crate::AutomationV2RunRecord> {
210    let updated = state
211        .update_automation_v2_run(run_id, |run| {
212            run.status = crate::AutomationRunStatus::Running;
213            run.started_at_ms.get_or_insert_with(now_ms);
214            crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
215                run,
216                "workflow_run_started",
217                Some("workflow runtime mirror started".to_string()),
218                None,
219            );
220            crate::app::state::refresh_automation_runtime_state(automation, run);
221        })
222        .await
223        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
224    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
225        .await
226        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
227    Ok(updated)
228}
229
230async fn sync_workflow_automation_action_started(
231    state: &AppState,
232    automation: &crate::AutomationV2Spec,
233    run_id: &str,
234    action_id: &str,
235) -> anyhow::Result<crate::AutomationV2RunRecord> {
236    let updated = state
237        .update_automation_v2_run(run_id, |run| {
238            let next_attempt = run
239                .checkpoint
240                .node_attempts
241                .get(action_id)
242                .copied()
243                .unwrap_or(0)
244                .saturating_add(1);
245            run.checkpoint
246                .node_attempts
247                .insert(action_id.to_string(), next_attempt);
248            run.detail = Some(format!("Running workflow action `{action_id}`"));
249            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
250                run,
251                "workflow_action_started",
252                Some(format!("workflow action `{action_id}` started")),
253                None,
254                Some(json!({
255                    "action_id": action_id,
256                    "attempt": next_attempt,
257                })),
258            );
259            crate::app::state::refresh_automation_runtime_state(automation, run);
260        })
261        .await
262        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
263    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
264        .await
265        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
266    Ok(updated)
267}
268
269async fn sync_workflow_automation_action_completed(
270    state: &AppState,
271    automation: &crate::AutomationV2Spec,
272    run_id: &str,
273    action_id: &str,
274    output: &Value,
275) -> anyhow::Result<crate::AutomationV2RunRecord> {
276    let action_count = automation.flow.nodes.len();
277    let updated = state
278        .update_automation_v2_run(run_id, |run| {
279            run.checkpoint.pending_nodes.retain(|id| id != action_id);
280            if !run
281                .checkpoint
282                .completed_nodes
283                .iter()
284                .any(|id| id == action_id)
285            {
286                run.checkpoint.completed_nodes.push(action_id.to_string());
287            }
288            run.checkpoint.node_outputs.insert(
289                action_id.to_string(),
290                json!(crate::AutomationNodeOutput {
291                    contract_kind: "generic_artifact".to_string(),
292                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
293                    validator_summary: Some(crate::AutomationValidatorSummary {
294                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
295                        outcome: "accepted".to_string(),
296                        reason: Some("workflow action completed".to_string()),
297                        unmet_requirements: Vec::new(),
298                        warning_requirements: Vec::new(),
299                        warning_count: 0,
300                        accepted_candidate_source: Some("workflow_runtime".to_string()),
301                        verification_outcome: Some("not_applicable".to_string()),
302                        validation_basis: None,
303                        repair_attempted: false,
304                        repair_attempt: 0,
305                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
306                            as u32,
307                        repair_succeeded: false,
308                        repair_exhausted: false,
309                    }),
310                    summary: format!("Workflow action `{action_id}` completed"),
311                    content: json!({
312                        "action_id": action_id,
313                        "output": output,
314                    }),
315                    created_at_ms: now_ms(),
316                    node_id: action_id.to_string(),
317                    status: Some("completed".to_string()),
318                    blocked_reason: None,
319                    approved: None,
320                    workflow_class: Some("workflow_action".to_string()),
321                    phase: Some("execution".to_string()),
322                    failure_kind: None,
323                    tool_telemetry: None,
324                    preflight: None,
325                    knowledge_preflight: None,
326                    capability_resolution: None,
327                    attempt_evidence: None,
328                    blocker_category: None,
329                    fallback_used: None,
330                    artifact_validation: None,
331                    receipt_timeline: None,
332                    quality_mode: Some("strict_research_v1".to_string()),
333                    requested_quality_mode: None,
334                    emergency_rollback_enabled: Some(false),
335                    provenance: Some(crate::AutomationNodeOutputProvenance {
336                        session_id: format!("workflow-runtime-{run_id}"),
337                        node_id: action_id.to_string(),
338                        run_id: Some(run_id.to_string()),
339                        output_path: None,
340                        content_digest: None,
341                        accepted_candidate_source: Some("workflow_runtime".to_string()),
342                        validation_outcome: Some("not_applicable".to_string()),
343                        repair_attempt: Some(0),
344                        repair_succeeded: Some(false),
345                        reuse_allowed: Some(false),
346                        freshness: crate::AutomationNodeOutputFreshness {
347                            current_run: true,
348                            current_attempt: true,
349                        },
350                    }),
351                }),
352            );
353            if run.checkpoint.completed_nodes.len() >= action_count {
354                run.status = crate::AutomationRunStatus::Completed;
355                run.detail = Some("workflow runtime mirror completed".to_string());
356            }
357            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
358                run,
359                "workflow_action_completed",
360                Some(format!("workflow action `{action_id}` completed")),
361                None,
362                Some(json!({
363                    "action_id": action_id,
364                })),
365            );
366            crate::app::state::refresh_automation_runtime_state(automation, run);
367        })
368        .await
369        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
370    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
371        .await
372        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
373    Ok(updated)
374}
375
376async fn sync_workflow_automation_action_failed(
377    state: &AppState,
378    automation: &crate::AutomationV2Spec,
379    run_id: &str,
380    action_id: &str,
381    error: &str,
382) -> anyhow::Result<crate::AutomationV2RunRecord> {
383    let updated = state
384        .update_automation_v2_run(run_id, |run| {
385            run.status = crate::AutomationRunStatus::Failed;
386            run.detail = Some(format!("Workflow action `{action_id}` failed"));
387            run.checkpoint.pending_nodes.retain(|id| id != action_id);
388            run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
389                node_id: action_id.to_string(),
390                reason: error.to_string(),
391                failed_at_ms: now_ms(),
392            });
393            run.checkpoint.node_outputs.insert(
394                action_id.to_string(),
395                json!(crate::AutomationNodeOutput {
396                    contract_kind: "generic_artifact".to_string(),
397                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
398                    validator_summary: Some(crate::AutomationValidatorSummary {
399                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
400                        outcome: "rejected".to_string(),
401                        reason: Some(error.to_string()),
402                        unmet_requirements: vec![error.to_string()],
403                        warning_requirements: Vec::new(),
404                        warning_count: 0,
405                        accepted_candidate_source: None,
406                        verification_outcome: Some("failed".to_string()),
407                        validation_basis: None,
408                        repair_attempted: false,
409                        repair_attempt: 0,
410                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
411                            as u32,
412                        repair_succeeded: false,
413                        repair_exhausted: false,
414                    }),
415                    summary: format!("Workflow action `{action_id}` failed"),
416                    content: json!({
417                        "action_id": action_id,
418                        "error": error,
419                    }),
420                    created_at_ms: now_ms(),
421                    node_id: action_id.to_string(),
422                    status: Some("failed".to_string()),
423                    blocked_reason: Some(error.to_string()),
424                    approved: None,
425                    workflow_class: Some("workflow_action".to_string()),
426                    phase: Some("execution".to_string()),
427                    failure_kind: Some("workflow_action_failed".to_string()),
428                    tool_telemetry: None,
429                    preflight: None,
430                    knowledge_preflight: None,
431                    capability_resolution: None,
432                    attempt_evidence: None,
433                    blocker_category: Some("tool_result_unusable".to_string()),
434                    fallback_used: None,
435                    artifact_validation: None,
436                    receipt_timeline: None,
437                    quality_mode: Some("strict_research_v1".to_string()),
438                    requested_quality_mode: None,
439                    emergency_rollback_enabled: Some(false),
440                    provenance: Some(crate::AutomationNodeOutputProvenance {
441                        session_id: format!("workflow-runtime-{run_id}"),
442                        node_id: action_id.to_string(),
443                        run_id: Some(run_id.to_string()),
444                        output_path: None,
445                        content_digest: None,
446                        accepted_candidate_source: None,
447                        validation_outcome: Some("failed".to_string()),
448                        repair_attempt: Some(0),
449                        repair_succeeded: Some(false),
450                        reuse_allowed: Some(false),
451                        freshness: crate::AutomationNodeOutputFreshness {
452                            current_run: true,
453                            current_attempt: true,
454                        },
455                    }),
456                }),
457            );
458            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
459                run,
460                "workflow_action_failed",
461                Some(format!("workflow action `{action_id}` failed")),
462                None,
463                Some(json!({
464                    "action_id": action_id,
465                    "error": error,
466                })),
467            );
468            crate::app::state::refresh_automation_runtime_state(automation, run);
469        })
470        .await
471        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
472    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
473        .await
474        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
475    Ok(updated)
476}
477
478#[derive(Debug, Clone)]
479pub enum ParsedWorkflowAction {
480    EventEmit { event_type: String },
481    ResourcePut { key: String },
482    ResourcePatch { key: String },
483    ResourceDelete { key: String },
484    Tool { tool_name: String },
485    Capability { capability_id: String },
486    Workflow { workflow_id: String },
487    Agent { agent_id: String },
488}
489
490pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
491    let trimmed = action.trim();
492    if let Some(rest) = trimmed.strip_prefix("event:") {
493        return ParsedWorkflowAction::EventEmit {
494            event_type: rest.trim().to_string(),
495        };
496    }
497    if let Some(rest) = trimmed.strip_prefix("resource:put:") {
498        return ParsedWorkflowAction::ResourcePut {
499            key: rest.trim().to_string(),
500        };
501    }
502    if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
503        return ParsedWorkflowAction::ResourcePatch {
504            key: rest.trim().to_string(),
505        };
506    }
507    if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
508        return ParsedWorkflowAction::ResourceDelete {
509            key: rest.trim().to_string(),
510        };
511    }
512    if let Some(rest) = trimmed.strip_prefix("tool:") {
513        return ParsedWorkflowAction::Tool {
514            tool_name: rest.trim().to_string(),
515        };
516    }
517    if let Some(rest) = trimmed.strip_prefix("capability:") {
518        return ParsedWorkflowAction::Capability {
519            capability_id: rest.trim().to_string(),
520        };
521    }
522    if let Some(rest) = trimmed.strip_prefix("workflow:") {
523        return ParsedWorkflowAction::Workflow {
524            workflow_id: rest.trim().to_string(),
525        };
526    }
527    if let Some(rest) = trimmed.strip_prefix("agent:") {
528        return ParsedWorkflowAction::Agent {
529            agent_id: rest.trim().to_string(),
530        };
531    }
532    ParsedWorkflowAction::Capability {
533        capability_id: trimmed.to_string(),
534    }
535}
536
537pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
538    let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
539    match event.event_type.as_str() {
540        "context.task.created" => names.push("task_created".to_string()),
541        "context.task.started" => names.push("task_started".to_string()),
542        "context.task.completed" => names.push("task_completed".to_string()),
543        "context.task.failed" => names.push("task_failed".to_string()),
544        "workflow.run.started" | "routine.run.created" => {
545            names.push("workflow_started".to_string())
546        }
547        "workflow.run.completed" | "routine.run.completed" => {
548            names.push("workflow_completed".to_string())
549        }
550        "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
551        _ => {}
552    }
553    names.sort();
554    names.dedup();
555    names
556}
557
558pub async fn simulate_workflow_event(
559    state: &AppState,
560    event: &EngineEvent,
561) -> WorkflowSimulationResult {
562    let registry = state.workflow_registry().await;
563    let canonical = canonical_workflow_event_names(event);
564    let matched_bindings = registry
565        .hooks
566        .into_iter()
567        .filter(|hook| {
568            hook.enabled
569                && canonical
570                    .iter()
571                    .any(|name| event_name_matches(&hook.event, name))
572        })
573        .collect::<Vec<_>>();
574    let planned_actions = matched_bindings
575        .iter()
576        .flat_map(|hook| hook.actions.clone())
577        .collect::<Vec<_>>();
578    WorkflowSimulationResult {
579        matched_bindings,
580        planned_actions,
581        canonical_events: canonical,
582    }
583}
584
585pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
586    let simulation = simulate_workflow_event(state, event).await;
587    if simulation.matched_bindings.is_empty() {
588        return;
589    }
590    for hook in simulation.matched_bindings {
591        let source_event_id = source_event_id(event);
592        let task_id = task_id_from_event(event);
593        let tenant_context = event_tenant_context(event);
594        let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
595        {
596            let mut seen = state.workflow_dispatch_seen.write().await;
597            if seen.contains_key(&dedupe_key) {
598                continue;
599            }
600            seen.insert(dedupe_key, now_ms());
601        }
602        let _ = execute_hook_binding(
603            state,
604            &hook,
605            tenant_context.clone(),
606            Some(event.event_type.clone()),
607            Some(source_event_id),
608            task_id,
609            false,
610        )
611        .await;
612    }
613}
614
615pub async fn run_workflow_dispatcher(state: AppState) {
616    let mut rx = state.event_bus.subscribe();
617    loop {
618        match rx.recv().await {
619            Ok(event) => dispatch_workflow_event(&state, &event).await,
620            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
621            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
622        }
623    }
624}
625
626pub async fn execute_workflow(
627    state: &AppState,
628    workflow: &WorkflowSpec,
629    tenant_context: TenantContext,
630    trigger_event: Option<String>,
631    source_event_id: Option<String>,
632    task_id: Option<String>,
633    dry_run: bool,
634) -> anyhow::Result<WorkflowRunRecord> {
635    let actions = workflow
636        .steps
637        .iter()
638        .map(|step| PreparedWorkflowAction {
639            action_id: step.step_id.clone(),
640            spec: WorkflowActionSpec {
641                action: step.action.clone(),
642                with: step.with.clone(),
643            },
644        })
645        .collect::<Vec<_>>();
646    execute_actions(
647        state,
648        &workflow.workflow_id,
649        None,
650        actions,
651        Some(workflow.name.clone()),
652        workflow.description.clone(),
653        workflow.source.clone(),
654        tenant_context,
655        trigger_event,
656        source_event_id,
657        task_id,
658        dry_run,
659    )
660    .await
661}
662
663pub async fn execute_hook_binding(
664    state: &AppState,
665    hook: &WorkflowHookBinding,
666    tenant_context: TenantContext,
667    trigger_event: Option<String>,
668    source_event_id: Option<String>,
669    task_id: Option<String>,
670    dry_run: bool,
671) -> anyhow::Result<WorkflowRunRecord> {
672    let workflow = state
673        .get_workflow(&hook.workflow_id)
674        .await
675        .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
676    execute_actions(
677        state,
678        &hook.workflow_id,
679        Some(hook.binding_id.clone()),
680        hook.actions
681            .iter()
682            .enumerate()
683            .map(|(idx, action)| PreparedWorkflowAction {
684                action_id: format!("action_{}", idx + 1),
685                spec: action.clone(),
686            })
687            .collect(),
688        None,
689        None,
690        workflow.source,
691        tenant_context,
692        trigger_event,
693        source_event_id,
694        task_id,
695        dry_run,
696    )
697    .await
698}
699
700async fn execute_actions(
701    state: &AppState,
702    workflow_id: &str,
703    binding_id: Option<String>,
704    actions: Vec<PreparedWorkflowAction>,
705    workflow_name: Option<String>,
706    workflow_description: Option<String>,
707    source: Option<WorkflowSourceRef>,
708    tenant_context: TenantContext,
709    trigger_event: Option<String>,
710    source_event_id: Option<String>,
711    task_id: Option<String>,
712    dry_run: bool,
713) -> anyhow::Result<WorkflowRunRecord> {
714    let run_id = format!("workflow-run-{}", Uuid::new_v4());
715    let now = now_ms();
716    let automation = compile_workflow_run_automation(
717        workflow_id,
718        workflow_name.as_deref(),
719        workflow_description.as_deref(),
720        binding_id.as_deref(),
721        &actions,
722        source.as_ref(),
723        trigger_event.as_deref(),
724    );
725    let automation = state.put_automation_v2(automation).await?;
726    let automation_run = state
727        .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
728        .await?;
729    let automation_run =
730        sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
731    let mut run = WorkflowRunRecord {
732        run_id: run_id.clone(),
733        workflow_id: workflow_id.to_string(),
734        tenant_context: tenant_context.clone(),
735        automation_id: Some(automation.automation_id.clone()),
736        automation_run_id: Some(automation_run.run_id.clone()),
737        binding_id,
738        trigger_event: trigger_event.clone(),
739        source_event_id: source_event_id.clone(),
740        task_id: task_id.clone(),
741        status: if dry_run {
742            WorkflowRunStatus::DryRun
743        } else {
744            WorkflowRunStatus::Running
745        },
746        created_at_ms: now,
747        updated_at_ms: now,
748        finished_at_ms: if dry_run { Some(now) } else { None },
749        actions: actions
750            .iter()
751            .map(|action| WorkflowActionRunRecord {
752                action_id: action.action_id.clone(),
753                action: action.spec.action.clone(),
754                task_id: task_id.clone(),
755                status: if dry_run {
756                    WorkflowActionRunStatus::Skipped
757                } else {
758                    WorkflowActionRunStatus::Pending
759                },
760                detail: None,
761                output: None,
762                updated_at_ms: now,
763            })
764            .collect(),
765        source,
766    };
767    state.put_workflow_run(run.clone()).await?;
768    let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
769    state.event_bus.publish(EngineEvent::new(
770        "workflow.run.started",
771        workflow_event_with_tenant_context(
772            json!({
773            "runID": run.run_id,
774            "workflowID": run.workflow_id,
775            "bindingID": run.binding_id,
776            "triggerEvent": trigger_event,
777            "sourceEventID": source_event_id,
778            "taskID": task_id,
779            "dryRun": dry_run,
780            }),
781            &run.tenant_context,
782        ),
783    ));
784    if dry_run {
785        return Ok(run);
786    }
787    for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
788        action_row.status = WorkflowActionRunStatus::Running;
789        action_row.updated_at_ms = now_ms();
790        let action_name = action_row.action.clone();
791        let _ = sync_workflow_automation_action_started(
792            state,
793            &automation,
794            automation_run.run_id.as_str(),
795            &action_row.action_id,
796        )
797        .await;
798        state
799            .update_workflow_run(&run.run_id, |row| {
800                if let Some(target) = row
801                    .actions
802                    .iter_mut()
803                    .find(|item| item.action_id == action_row.action_id)
804                {
805                    *target = action_row.clone();
806                }
807            })
808            .await;
809        if let Some(latest) = state.get_workflow_run(&run.run_id).await {
810            let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
811        }
812        state.event_bus.publish(EngineEvent::new(
813            "workflow.action.started",
814            workflow_event_with_tenant_context(
815                json!({
816                "runID": run.run_id,
817                "workflowID": run.workflow_id,
818                "actionID": action_row.action_id,
819                "action": action_name,
820                "taskID": run.task_id,
821                }),
822                &run.tenant_context,
823            ),
824        ));
825        match execute_action(
826            state,
827            &run.run_id,
828            workflow_id,
829            &action_spec.spec,
830            action_row,
831            &run.tenant_context,
832            trigger_event.clone(),
833        )
834        .await
835        {
836            Ok(output) => {
837                action_row.status = WorkflowActionRunStatus::Completed;
838                action_row.output = Some(output.clone());
839                action_row.updated_at_ms = now_ms();
840                state
841                    .update_workflow_run(&run.run_id, |row| {
842                        if let Some(target) = row
843                            .actions
844                            .iter_mut()
845                            .find(|item| item.action_id == action_row.action_id)
846                        {
847                            *target = action_row.clone();
848                        }
849                    })
850                    .await;
851                let _ = sync_workflow_automation_action_completed(
852                    state,
853                    &automation,
854                    automation_run.run_id.as_str(),
855                    &action_row.action_id,
856                    &output,
857                )
858                .await;
859                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
860                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
861                }
862                state.event_bus.publish(EngineEvent::new(
863                    "workflow.action.completed",
864                    workflow_event_with_tenant_context(
865                        json!({
866                        "runID": run.run_id,
867                        "workflowID": run.workflow_id,
868                        "actionID": action_row.action_id,
869                        "action": action_name,
870                        "taskID": run.task_id,
871                        "output": output,
872                        }),
873                        &run.tenant_context,
874                    ),
875                ));
876            }
877            Err(error) => {
878                let detail = error.to_string();
879                action_row.status = WorkflowActionRunStatus::Failed;
880                action_row.detail = Some(detail.clone());
881                action_row.updated_at_ms = now_ms();
882                run.status = WorkflowRunStatus::Failed;
883                run.finished_at_ms = Some(now_ms());
884                state
885                    .update_workflow_run(&run.run_id, |row| {
886                        row.status = WorkflowRunStatus::Failed;
887                        row.finished_at_ms = Some(now_ms());
888                        if let Some(target) = row
889                            .actions
890                            .iter_mut()
891                            .find(|item| item.action_id == action_row.action_id)
892                        {
893                            *target = action_row.clone();
894                        }
895                    })
896                    .await;
897                let _ = sync_workflow_automation_action_failed(
898                    state,
899                    &automation,
900                    automation_run.run_id.as_str(),
901                    &action_row.action_id,
902                    &detail,
903                )
904                .await;
905                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
906                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
907                }
908                state.event_bus.publish(EngineEvent::new(
909                    "workflow.action.failed",
910                    workflow_event_with_tenant_context(
911                        json!({
912                        "runID": run.run_id,
913                        "workflowID": run.workflow_id,
914                        "actionID": action_row.action_id,
915                        "action": action_name,
916                        "taskID": run.task_id,
917                        "error": detail,
918                        }),
919                        &run.tenant_context,
920                    ),
921                ));
922                state.event_bus.publish(EngineEvent::new(
923                    "workflow.run.failed",
924                    workflow_event_with_tenant_context(
925                        json!({
926                        "runID": run.run_id,
927                        "workflowID": run.workflow_id,
928                        "actionID": action_row.action_id,
929                        "taskID": run.task_id,
930                        "error": action_row.detail,
931                        }),
932                        &run.tenant_context,
933                    ),
934                ));
935                return state.get_workflow_run(&run.run_id).await.with_context(|| {
936                    format!("workflow run `{}` missing after failure", run.run_id)
937                });
938            }
939        }
940    }
941    run.status = WorkflowRunStatus::Completed;
942    run.finished_at_ms = Some(now_ms());
943    let final_run = state
944        .update_workflow_run(&run.run_id, |row| {
945            row.status = WorkflowRunStatus::Completed;
946            row.finished_at_ms = Some(now_ms());
947        })
948        .await
949        .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
950    let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
951    state.event_bus.publish(EngineEvent::new(
952        "workflow.run.completed",
953        workflow_event_with_tenant_context(
954            json!({
955            "runID": final_run.run_id,
956            "workflowID": final_run.workflow_id,
957            "bindingID": final_run.binding_id,
958            "taskID": final_run.task_id,
959            }),
960            &final_run.tenant_context,
961        ),
962    ));
963    Ok(final_run)
964}
965
966async fn execute_action(
967    state: &AppState,
968    run_id: &str,
969    workflow_id: &str,
970    action_spec: &WorkflowActionSpec,
971    action_row: &WorkflowActionRunRecord,
972    tenant_context: &TenantContext,
973    trigger_event: Option<String>,
974) -> anyhow::Result<Value> {
975    let action_name = action_spec.action.as_str();
976    let parsed = parse_workflow_action(action_name);
977    match parsed {
978        ParsedWorkflowAction::EventEmit { event_type } => {
979            let payload = action_payload(action_spec, action_row);
980            state.event_bus.publish(EngineEvent::new(
981                event_type.clone(),
982                workflow_event_with_tenant_context(
983                    json!({
984                    "workflowID": workflow_id,
985                    "actionID": action_row.action_id,
986                    "triggerEvent": trigger_event,
987                    "payload": payload,
988                    }),
989                    tenant_context,
990                ),
991            ));
992            Ok(json!({ "eventType": event_type }))
993        }
994        ParsedWorkflowAction::ResourcePut { key } => {
995            let record = state
996                .put_shared_resource(
997                    key.clone(),
998                    action_payload(action_spec, action_row),
999                    None,
1000                    "workflow".to_string(),
1001                    None,
1002                )
1003                .await
1004                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1005            Ok(json!({ "key": record.key, "rev": record.rev }))
1006        }
1007        ParsedWorkflowAction::ResourcePatch { key } => {
1008            let current = state.get_shared_resource(&key).await;
1009            let next_rev = current.as_ref().map(|row| row.rev);
1010            let record = state
1011                .put_shared_resource(
1012                    key.clone(),
1013                    merge_object(
1014                        current.map(|row| row.value).unwrap_or_else(|| json!({})),
1015                        action_payload(action_spec, action_row),
1016                    ),
1017                    next_rev,
1018                    "workflow".to_string(),
1019                    None,
1020                )
1021                .await
1022                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1023            Ok(json!({ "key": record.key, "rev": record.rev }))
1024        }
1025        ParsedWorkflowAction::ResourceDelete { key } => {
1026            let deleted = state
1027                .delete_shared_resource(&key, None)
1028                .await
1029                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1030            Ok(json!({ "key": key, "deleted": deleted.is_some() }))
1031        }
1032        ParsedWorkflowAction::Tool { tool_name } => {
1033            let payload = action_payload(action_spec, action_row);
1034            let result = state.tools.execute(&tool_name, payload.clone()).await?;
1035            let mut response = json!({
1036                "tool": tool_name,
1037                "output": result.output,
1038                "metadata": result.metadata,
1039            });
1040            if let Some(external_action) = record_workflow_external_action(
1041                state,
1042                run_id,
1043                workflow_id,
1044                action_row,
1045                trigger_event.clone(),
1046                WorkflowExternalActionExecution::Tool {
1047                    tool_name: tool_name.clone(),
1048                },
1049                &payload,
1050                &response,
1051            )
1052            .await?
1053            {
1054                if let Some(obj) = response.as_object_mut() {
1055                    obj.insert("external_action".to_string(), external_action);
1056                }
1057            }
1058            Ok(response)
1059        }
1060        ParsedWorkflowAction::Capability { capability_id } => {
1061            let bindings = state.capability_resolver.list_bindings().await?;
1062            let tool_name = bindings
1063                .bindings
1064                .iter()
1065                .find(|binding| binding.capability_id == capability_id)
1066                .map(|binding| binding.tool_name.clone())
1067                .unwrap_or_else(|| capability_id.clone());
1068            let payload = action_payload(action_spec, action_row);
1069            let result = state.tools.execute(&tool_name, payload.clone()).await?;
1070            let mut response = json!({
1071                "capability": capability_id,
1072                "tool": tool_name,
1073                "output": result.output,
1074                "metadata": result.metadata,
1075            });
1076            if let Some(external_action) = record_workflow_external_action(
1077                state,
1078                run_id,
1079                workflow_id,
1080                action_row,
1081                trigger_event.clone(),
1082                WorkflowExternalActionExecution::Capability {
1083                    capability_id: capability_id.clone(),
1084                    tool_name: tool_name.clone(),
1085                },
1086                &payload,
1087                &response,
1088            )
1089            .await?
1090            {
1091                if let Some(obj) = response.as_object_mut() {
1092                    obj.insert("external_action".to_string(), external_action);
1093                }
1094            }
1095            Ok(response)
1096        }
1097        ParsedWorkflowAction::Workflow { workflow_id } => {
1098            anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1099        }
1100        ParsedWorkflowAction::Agent { agent_id } => {
1101            let workspace_root = state.workspace_index.snapshot().await.root;
1102            let session = Session::new(
1103                Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1104                Some(workspace_root.clone()),
1105            );
1106            let session_id = session.id.clone();
1107            state.storage.save_session(session).await?;
1108            let prompt = action_spec
1109                .with
1110                .as_ref()
1111                .and_then(|v| v.get("prompt"))
1112                .and_then(|v| v.as_str())
1113                .map(ToString::to_string)
1114                .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1115            let request = SendMessageRequest {
1116                parts: vec![MessagePartInput::Text { text: prompt }],
1117                model: None,
1118                agent: Some(agent_id.clone()),
1119                tool_mode: None,
1120                tool_allowlist: None,
1121                context_mode: None,
1122                write_required: None,
1123                prewrite_requirements: None,
1124            };
1125            state
1126                .engine_loop
1127                .run_prompt_async_with_context(
1128                    session_id.clone(),
1129                    request,
1130                    Some(format!("workflow:{workflow_id}")),
1131                )
1132                .await?;
1133            Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1134        }
1135    }
1136}
1137
1138enum WorkflowExternalActionExecution {
1139    Tool {
1140        tool_name: String,
1141    },
1142    Capability {
1143        capability_id: String,
1144        tool_name: String,
1145    },
1146}
1147
1148async fn record_workflow_external_action(
1149    state: &AppState,
1150    run_id: &str,
1151    workflow_id: &str,
1152    action_row: &WorkflowActionRunRecord,
1153    trigger_event: Option<String>,
1154    execution: WorkflowExternalActionExecution,
1155    payload: &Value,
1156    result: &Value,
1157) -> anyhow::Result<Option<Value>> {
1158    let bindings = state.capability_resolver.list_bindings().await?;
1159    let binding = match execution {
1160        WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1161            .bindings
1162            .iter()
1163            .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1164        WorkflowExternalActionExecution::Capability {
1165            ref capability_id,
1166            ref tool_name,
1167        } => bindings.bindings.iter().find(|binding| {
1168            binding.capability_id == *capability_id
1169                && workflow_binding_matches_tool_name(binding, tool_name)
1170        }),
1171    };
1172    let Some(binding) = binding else {
1173        return Ok(None);
1174    };
1175
1176    let target = workflow_external_action_target(payload, result);
1177    let source_id = format!("{run_id}:{}", action_row.action_id);
1178    let idempotency_key = crate::sha256_hex(&[
1179        workflow_id,
1180        run_id,
1181        &action_row.action_id,
1182        &action_row.action,
1183        &payload.to_string(),
1184    ]);
1185    let action = crate::ExternalActionRecord {
1186        action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1187        operation: binding.capability_id.clone(),
1188        status: "posted".to_string(),
1189        source_kind: Some("workflow".to_string()),
1190        source_id: Some(source_id.clone()),
1191        routine_run_id: None,
1192        context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1193        capability_id: Some(binding.capability_id.clone()),
1194        provider: Some(binding.provider.clone()),
1195        target,
1196        approval_state: Some("executed".to_string()),
1197        idempotency_key: Some(idempotency_key),
1198        receipt: Some(result.clone()),
1199        error: None,
1200        metadata: Some(json!({
1201            "workflowID": workflow_id,
1202            "workflowRunID": run_id,
1203            "actionID": action_row.action_id,
1204            "action": action_row.action,
1205            "taskID": action_row.task_id,
1206            "triggerEvent": trigger_event,
1207            "tool": binding.tool_name,
1208            "provider": binding.provider,
1209            "input": payload,
1210        })),
1211        created_at_ms: action_row.updated_at_ms,
1212        updated_at_ms: action_row.updated_at_ms,
1213    };
1214    let recorded = state.record_external_action(action).await?;
1215    Ok(Some(serde_json::to_value(&recorded)?))
1216}
1217
1218fn workflow_binding_matches_tool_name(
1219    binding: &crate::capability_resolver::CapabilityBinding,
1220    tool_name: &str,
1221) -> bool {
1222    binding.tool_name.eq_ignore_ascii_case(tool_name)
1223        || binding
1224            .tool_name_aliases
1225            .iter()
1226            .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1227}
1228
1229fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1230    for candidate in [
1231        payload.pointer("/owner_repo").and_then(Value::as_str),
1232        payload.pointer("/repo").and_then(Value::as_str),
1233        payload.pointer("/repository").and_then(Value::as_str),
1234        payload.pointer("/channel").and_then(Value::as_str),
1235        payload.pointer("/channel_id").and_then(Value::as_str),
1236        payload.pointer("/thread_ts").and_then(Value::as_str),
1237        result.pointer("/metadata/channel").and_then(Value::as_str),
1238        result.pointer("/metadata/repo").and_then(Value::as_str),
1239    ] {
1240        let trimmed = candidate.map(str::trim).unwrap_or_default();
1241        if !trimmed.is_empty() {
1242            return Some(trimmed.to_string());
1243        }
1244    }
1245    None
1246}
1247
1248fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1249    action_spec
1250        .with
1251        .clone()
1252        .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1253}
1254
1255fn merge_object(current: Value, patch: Value) -> Value {
1256    if let (Some(mut current_obj), Some(patch_obj)) =
1257        (current.as_object().cloned(), patch.as_object())
1258    {
1259        for (key, value) in patch_obj {
1260            current_obj.insert(key.clone(), value.clone());
1261        }
1262        Value::Object(current_obj)
1263    } else {
1264        patch
1265    }
1266}
1267
1268fn source_event_id(event: &EngineEvent) -> String {
1269    if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1270        return id.to_string();
1271    }
1272    for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1273        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1274            return format!("{}:{id}", event.event_type);
1275        }
1276    }
1277    format!("{}:{}", event.event_type, event.properties)
1278}
1279
1280fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1281    for key in ["task_id", "taskID", "step_id", "stepID"] {
1282        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1283            let trimmed = id.trim();
1284            if !trimmed.is_empty() {
1285                return Some(trimmed.to_string());
1286            }
1287        }
1288    }
1289    None
1290}
1291
1292fn event_name_matches(expected: &str, actual: &str) -> bool {
1293    expected.trim().eq_ignore_ascii_case(actual.trim())
1294}