Skip to main content

tandem_server/
workflows.rs

1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session};
4use tandem_workflows::{
5    WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6    WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13struct PreparedWorkflowAction {
14    action_id: String,
15    spec: WorkflowActionSpec,
16}
17
18fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
19    match with {
20        Some(with) if !with.is_null() => {
21            format!("Execute workflow action `{action}` with payload {with}.")
22        }
23        _ => format!("Execute workflow action `{action}`."),
24    }
25}
26
27fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
28    crate::AutomationV2Schedule {
29        schedule_type: crate::AutomationV2ScheduleType::Manual,
30        cron_expression: None,
31        interval_seconds: None,
32        timezone: "UTC".to_string(),
33        misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
34    }
35}
36
37fn workflow_execution_plan(
38    workflow_id: &str,
39    name: &str,
40    description: Option<String>,
41    actions: &[PreparedWorkflowAction],
42    source_label: &str,
43    source: Option<&WorkflowSourceRef>,
44    trigger_event: Option<&str>,
45) -> crate::WorkflowPlan {
46    crate::WorkflowPlan {
47        plan_id: format!("workflow-plan-{workflow_id}"),
48        planner_version: "workflow_runtime_v1".to_string(),
49        plan_source: source_label.to_string(),
50        original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
51        normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
52        confidence: "high".to_string(),
53        title: name.to_string(),
54        description,
55        schedule: workflow_manual_schedule(),
56        execution_target: "automation_v2".to_string(),
57        workspace_root: std::env::current_dir()
58            .unwrap_or_else(|_| std::path::PathBuf::from("."))
59            .to_string_lossy()
60            .to_string(),
61        steps: actions
62            .iter()
63            .map(|action| crate::WorkflowPlanStep {
64                step_id: action.action_id.clone(),
65                kind: "workflow_action".to_string(),
66                objective: workflow_action_objective(
67                    &action.spec.action,
68                    action.spec.with.as_ref(),
69                ),
70                depends_on: Vec::new(),
71                agent_role: "operator".to_string(),
72                input_refs: Vec::new(),
73                output_contract: Some(crate::AutomationFlowOutputContract {
74                    kind: "generic_artifact".to_string(),
75                    validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
76                    enforcement: None,
77                    schema: None,
78                    summary_guidance: None,
79                }),
80                metadata: None,
81            })
82            .collect(),
83        requires_integrations: Vec::new(),
84        allowed_mcp_servers: Vec::new(),
85        operator_preferences: Some(json!({
86            "source": source_label,
87            "tool_access_mode": "auto",
88        })),
89        save_options: json!({
90            "origin": source_label,
91            "workflow_source": source,
92            "trigger_event": trigger_event,
93        }),
94    }
95}
96
97pub(crate) fn compile_workflow_spec_to_automation_preview(
98    workflow: &WorkflowSpec,
99) -> crate::AutomationV2Spec {
100    let actions = workflow
101        .steps
102        .iter()
103        .map(|step| PreparedWorkflowAction {
104            action_id: step.step_id.clone(),
105            spec: WorkflowActionSpec {
106                action: step.action.clone(),
107                with: step.with.clone(),
108            },
109        })
110        .collect::<Vec<_>>();
111    let mut automation = crate::http::compile_plan_to_automation_v2(
112        &workflow_execution_plan(
113            &workflow.workflow_id,
114            &workflow.name,
115            workflow.description.clone(),
116            &actions,
117            "workflow_registry",
118            workflow.source.as_ref(),
119            None,
120        ),
121        None,
122        "workflow_registry",
123    );
124    if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
125        metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
126        metadata.insert("workflow_name".to_string(), json!(workflow.name));
127        metadata.insert("workflow_source".to_string(), json!(workflow.source));
128        metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
129    }
130    automation
131}
132
133fn compile_workflow_run_automation(
134    workflow_id: &str,
135    workflow_name: Option<&str>,
136    workflow_description: Option<&str>,
137    binding_id: Option<&str>,
138    actions: &[PreparedWorkflowAction],
139    source: Option<&WorkflowSourceRef>,
140    trigger_event: Option<&str>,
141) -> crate::AutomationV2Spec {
142    let automation_id = binding_id
143        .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
144        .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
145    let title = binding_id
146        .map(|binding| {
147            workflow_name
148                .map(|name| format!("{name} hook {binding}"))
149                .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
150        })
151        .unwrap_or_else(|| {
152            workflow_name
153                .map(|name| format!("{name} execution"))
154                .unwrap_or_else(|| format!("Workflow {workflow_id}"))
155        });
156    let mut automation = crate::http::compile_plan_to_automation_v2(
157        &workflow_execution_plan(
158            &automation_id,
159            &title,
160            Some(
161                workflow_description
162                    .map(|description| description.to_string())
163                    .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
164            ),
165            actions,
166            "workflow_runtime",
167            source,
168            trigger_event,
169        ),
170        None,
171        "workflow_runtime",
172    );
173    automation.automation_id = automation_id.clone();
174    automation.name = title;
175    automation.metadata = Some(json!({
176        "workflow_id": workflow_id,
177        "binding_id": binding_id,
178        "workflow_source": source,
179        "trigger_event": trigger_event,
180        "origin": "workflow_runtime_mirror",
181    }));
182    automation
183}
184
185async fn sync_workflow_automation_run_start(
186    state: &AppState,
187    automation: &crate::AutomationV2Spec,
188    run_id: &str,
189) -> anyhow::Result<crate::AutomationV2RunRecord> {
190    let updated = state
191        .update_automation_v2_run(run_id, |run| {
192            run.status = crate::AutomationRunStatus::Running;
193            run.started_at_ms.get_or_insert_with(now_ms);
194            crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
195                run,
196                "workflow_run_started",
197                Some("workflow runtime mirror started".to_string()),
198                None,
199            );
200            crate::app::state::refresh_automation_runtime_state(automation, run);
201        })
202        .await
203        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
204    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
205        .await
206        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
207    Ok(updated)
208}
209
210async fn sync_workflow_automation_action_started(
211    state: &AppState,
212    automation: &crate::AutomationV2Spec,
213    run_id: &str,
214    action_id: &str,
215) -> anyhow::Result<crate::AutomationV2RunRecord> {
216    let updated = state
217        .update_automation_v2_run(run_id, |run| {
218            let next_attempt = run
219                .checkpoint
220                .node_attempts
221                .get(action_id)
222                .copied()
223                .unwrap_or(0)
224                .saturating_add(1);
225            run.checkpoint
226                .node_attempts
227                .insert(action_id.to_string(), next_attempt);
228            run.detail = Some(format!("Running workflow action `{action_id}`"));
229            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
230                run,
231                "workflow_action_started",
232                Some(format!("workflow action `{action_id}` started")),
233                None,
234                Some(json!({
235                    "action_id": action_id,
236                    "attempt": next_attempt,
237                })),
238            );
239            crate::app::state::refresh_automation_runtime_state(automation, run);
240        })
241        .await
242        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
243    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
244        .await
245        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
246    Ok(updated)
247}
248
249async fn sync_workflow_automation_action_completed(
250    state: &AppState,
251    automation: &crate::AutomationV2Spec,
252    run_id: &str,
253    action_id: &str,
254    output: &Value,
255) -> anyhow::Result<crate::AutomationV2RunRecord> {
256    let action_count = automation.flow.nodes.len();
257    let updated = state
258        .update_automation_v2_run(run_id, |run| {
259            run.checkpoint.pending_nodes.retain(|id| id != action_id);
260            if !run
261                .checkpoint
262                .completed_nodes
263                .iter()
264                .any(|id| id == action_id)
265            {
266                run.checkpoint.completed_nodes.push(action_id.to_string());
267            }
268            run.checkpoint.node_outputs.insert(
269                action_id.to_string(),
270                json!(crate::AutomationNodeOutput {
271                    contract_kind: "generic_artifact".to_string(),
272                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
273                    validator_summary: Some(crate::AutomationValidatorSummary {
274                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
275                        outcome: "accepted".to_string(),
276                        reason: Some("workflow action completed".to_string()),
277                        unmet_requirements: Vec::new(),
278                        warning_requirements: Vec::new(),
279                        warning_count: 0,
280                        accepted_candidate_source: Some("workflow_runtime".to_string()),
281                        verification_outcome: Some("not_applicable".to_string()),
282                        validation_basis: None,
283                        repair_attempted: false,
284                        repair_attempt: 0,
285                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
286                            as u32,
287                        repair_succeeded: false,
288                        repair_exhausted: false,
289                    }),
290                    summary: format!("Workflow action `{action_id}` completed"),
291                    content: json!({
292                        "action_id": action_id,
293                        "output": output,
294                    }),
295                    created_at_ms: now_ms(),
296                    node_id: action_id.to_string(),
297                    status: Some("completed".to_string()),
298                    blocked_reason: None,
299                    approved: None,
300                    workflow_class: Some("workflow_action".to_string()),
301                    phase: Some("execution".to_string()),
302                    failure_kind: None,
303                    tool_telemetry: None,
304                    preflight: None,
305                    knowledge_preflight: None,
306                    capability_resolution: None,
307                    attempt_evidence: None,
308                    blocker_category: None,
309                    fallback_used: None,
310                    artifact_validation: None,
311                    receipt_timeline: None,
312                    quality_mode: Some("strict_research_v1".to_string()),
313                    requested_quality_mode: None,
314                    emergency_rollback_enabled: Some(false),
315                    provenance: Some(crate::AutomationNodeOutputProvenance {
316                        session_id: format!("workflow-runtime-{run_id}"),
317                        node_id: action_id.to_string(),
318                        run_id: Some(run_id.to_string()),
319                        output_path: None,
320                        content_digest: None,
321                        accepted_candidate_source: Some("workflow_runtime".to_string()),
322                        validation_outcome: Some("not_applicable".to_string()),
323                        repair_attempt: Some(0),
324                        repair_succeeded: Some(false),
325                        reuse_allowed: Some(false),
326                        freshness: crate::AutomationNodeOutputFreshness {
327                            current_run: true,
328                            current_attempt: true,
329                        },
330                    }),
331                }),
332            );
333            if run.checkpoint.completed_nodes.len() >= action_count {
334                run.status = crate::AutomationRunStatus::Completed;
335                run.detail = Some("workflow runtime mirror completed".to_string());
336            }
337            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
338                run,
339                "workflow_action_completed",
340                Some(format!("workflow action `{action_id}` completed")),
341                None,
342                Some(json!({
343                    "action_id": action_id,
344                })),
345            );
346            crate::app::state::refresh_automation_runtime_state(automation, run);
347        })
348        .await
349        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
350    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
351        .await
352        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
353    Ok(updated)
354}
355
356async fn sync_workflow_automation_action_failed(
357    state: &AppState,
358    automation: &crate::AutomationV2Spec,
359    run_id: &str,
360    action_id: &str,
361    error: &str,
362) -> anyhow::Result<crate::AutomationV2RunRecord> {
363    let updated = state
364        .update_automation_v2_run(run_id, |run| {
365            run.status = crate::AutomationRunStatus::Failed;
366            run.detail = Some(format!("Workflow action `{action_id}` failed"));
367            run.checkpoint.pending_nodes.retain(|id| id != action_id);
368            run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
369                node_id: action_id.to_string(),
370                reason: error.to_string(),
371                failed_at_ms: now_ms(),
372            });
373            run.checkpoint.node_outputs.insert(
374                action_id.to_string(),
375                json!(crate::AutomationNodeOutput {
376                    contract_kind: "generic_artifact".to_string(),
377                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
378                    validator_summary: Some(crate::AutomationValidatorSummary {
379                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
380                        outcome: "rejected".to_string(),
381                        reason: Some(error.to_string()),
382                        unmet_requirements: vec![error.to_string()],
383                        warning_requirements: Vec::new(),
384                        warning_count: 0,
385                        accepted_candidate_source: None,
386                        verification_outcome: Some("failed".to_string()),
387                        validation_basis: None,
388                        repair_attempted: false,
389                        repair_attempt: 0,
390                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
391                            as u32,
392                        repair_succeeded: false,
393                        repair_exhausted: false,
394                    }),
395                    summary: format!("Workflow action `{action_id}` failed"),
396                    content: json!({
397                        "action_id": action_id,
398                        "error": error,
399                    }),
400                    created_at_ms: now_ms(),
401                    node_id: action_id.to_string(),
402                    status: Some("failed".to_string()),
403                    blocked_reason: Some(error.to_string()),
404                    approved: None,
405                    workflow_class: Some("workflow_action".to_string()),
406                    phase: Some("execution".to_string()),
407                    failure_kind: Some("workflow_action_failed".to_string()),
408                    tool_telemetry: None,
409                    preflight: None,
410                    knowledge_preflight: None,
411                    capability_resolution: None,
412                    attempt_evidence: None,
413                    blocker_category: Some("tool_result_unusable".to_string()),
414                    fallback_used: None,
415                    artifact_validation: None,
416                    receipt_timeline: None,
417                    quality_mode: Some("strict_research_v1".to_string()),
418                    requested_quality_mode: None,
419                    emergency_rollback_enabled: Some(false),
420                    provenance: Some(crate::AutomationNodeOutputProvenance {
421                        session_id: format!("workflow-runtime-{run_id}"),
422                        node_id: action_id.to_string(),
423                        run_id: Some(run_id.to_string()),
424                        output_path: None,
425                        content_digest: None,
426                        accepted_candidate_source: None,
427                        validation_outcome: Some("failed".to_string()),
428                        repair_attempt: Some(0),
429                        repair_succeeded: Some(false),
430                        reuse_allowed: Some(false),
431                        freshness: crate::AutomationNodeOutputFreshness {
432                            current_run: true,
433                            current_attempt: true,
434                        },
435                    }),
436                }),
437            );
438            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
439                run,
440                "workflow_action_failed",
441                Some(format!("workflow action `{action_id}` failed")),
442                None,
443                Some(json!({
444                    "action_id": action_id,
445                    "error": error,
446                })),
447            );
448            crate::app::state::refresh_automation_runtime_state(automation, run);
449        })
450        .await
451        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
452    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
453        .await
454        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
455    Ok(updated)
456}
457
458#[derive(Debug, Clone)]
459pub enum ParsedWorkflowAction {
460    EventEmit { event_type: String },
461    ResourcePut { key: String },
462    ResourcePatch { key: String },
463    ResourceDelete { key: String },
464    Tool { tool_name: String },
465    Capability { capability_id: String },
466    Workflow { workflow_id: String },
467    Agent { agent_id: String },
468}
469
470pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
471    let trimmed = action.trim();
472    if let Some(rest) = trimmed.strip_prefix("event:") {
473        return ParsedWorkflowAction::EventEmit {
474            event_type: rest.trim().to_string(),
475        };
476    }
477    if let Some(rest) = trimmed.strip_prefix("resource:put:") {
478        return ParsedWorkflowAction::ResourcePut {
479            key: rest.trim().to_string(),
480        };
481    }
482    if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
483        return ParsedWorkflowAction::ResourcePatch {
484            key: rest.trim().to_string(),
485        };
486    }
487    if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
488        return ParsedWorkflowAction::ResourceDelete {
489            key: rest.trim().to_string(),
490        };
491    }
492    if let Some(rest) = trimmed.strip_prefix("tool:") {
493        return ParsedWorkflowAction::Tool {
494            tool_name: rest.trim().to_string(),
495        };
496    }
497    if let Some(rest) = trimmed.strip_prefix("capability:") {
498        return ParsedWorkflowAction::Capability {
499            capability_id: rest.trim().to_string(),
500        };
501    }
502    if let Some(rest) = trimmed.strip_prefix("workflow:") {
503        return ParsedWorkflowAction::Workflow {
504            workflow_id: rest.trim().to_string(),
505        };
506    }
507    if let Some(rest) = trimmed.strip_prefix("agent:") {
508        return ParsedWorkflowAction::Agent {
509            agent_id: rest.trim().to_string(),
510        };
511    }
512    ParsedWorkflowAction::Capability {
513        capability_id: trimmed.to_string(),
514    }
515}
516
517pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
518    let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
519    match event.event_type.as_str() {
520        "context.task.created" => names.push("task_created".to_string()),
521        "context.task.started" => names.push("task_started".to_string()),
522        "context.task.completed" => names.push("task_completed".to_string()),
523        "context.task.failed" => names.push("task_failed".to_string()),
524        "workflow.run.started" | "routine.run.created" => {
525            names.push("workflow_started".to_string())
526        }
527        "workflow.run.completed" | "routine.run.completed" => {
528            names.push("workflow_completed".to_string())
529        }
530        "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
531        _ => {}
532    }
533    names.sort();
534    names.dedup();
535    names
536}
537
538pub async fn simulate_workflow_event(
539    state: &AppState,
540    event: &EngineEvent,
541) -> WorkflowSimulationResult {
542    let registry = state.workflow_registry().await;
543    let canonical = canonical_workflow_event_names(event);
544    let matched_bindings = registry
545        .hooks
546        .into_iter()
547        .filter(|hook| {
548            hook.enabled
549                && canonical
550                    .iter()
551                    .any(|name| event_name_matches(&hook.event, name))
552        })
553        .collect::<Vec<_>>();
554    let planned_actions = matched_bindings
555        .iter()
556        .flat_map(|hook| hook.actions.clone())
557        .collect::<Vec<_>>();
558    WorkflowSimulationResult {
559        matched_bindings,
560        planned_actions,
561        canonical_events: canonical,
562    }
563}
564
565pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
566    let simulation = simulate_workflow_event(state, event).await;
567    if simulation.matched_bindings.is_empty() {
568        return;
569    }
570    for hook in simulation.matched_bindings {
571        let source_event_id = source_event_id(event);
572        let task_id = task_id_from_event(event);
573        let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
574        {
575            let mut seen = state.workflow_dispatch_seen.write().await;
576            if seen.contains_key(&dedupe_key) {
577                continue;
578            }
579            seen.insert(dedupe_key, now_ms());
580        }
581        let _ = execute_hook_binding(
582            state,
583            &hook,
584            Some(event.event_type.clone()),
585            Some(source_event_id),
586            task_id,
587            false,
588        )
589        .await;
590    }
591}
592
593pub async fn run_workflow_dispatcher(state: AppState) {
594    let mut rx = state.event_bus.subscribe();
595    loop {
596        match rx.recv().await {
597            Ok(event) => dispatch_workflow_event(&state, &event).await,
598            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
599            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
600        }
601    }
602}
603
604pub async fn execute_workflow(
605    state: &AppState,
606    workflow: &WorkflowSpec,
607    trigger_event: Option<String>,
608    source_event_id: Option<String>,
609    task_id: Option<String>,
610    dry_run: bool,
611) -> anyhow::Result<WorkflowRunRecord> {
612    let actions = workflow
613        .steps
614        .iter()
615        .map(|step| PreparedWorkflowAction {
616            action_id: step.step_id.clone(),
617            spec: WorkflowActionSpec {
618                action: step.action.clone(),
619                with: step.with.clone(),
620            },
621        })
622        .collect::<Vec<_>>();
623    execute_actions(
624        state,
625        &workflow.workflow_id,
626        None,
627        actions,
628        Some(workflow.name.clone()),
629        workflow.description.clone(),
630        workflow.source.clone(),
631        trigger_event,
632        source_event_id,
633        task_id,
634        dry_run,
635    )
636    .await
637}
638
639pub async fn execute_hook_binding(
640    state: &AppState,
641    hook: &WorkflowHookBinding,
642    trigger_event: Option<String>,
643    source_event_id: Option<String>,
644    task_id: Option<String>,
645    dry_run: bool,
646) -> anyhow::Result<WorkflowRunRecord> {
647    let workflow = state
648        .get_workflow(&hook.workflow_id)
649        .await
650        .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
651    execute_actions(
652        state,
653        &hook.workflow_id,
654        Some(hook.binding_id.clone()),
655        hook.actions
656            .iter()
657            .enumerate()
658            .map(|(idx, action)| PreparedWorkflowAction {
659                action_id: format!("action_{}", idx + 1),
660                spec: action.clone(),
661            })
662            .collect(),
663        None,
664        None,
665        workflow.source,
666        trigger_event,
667        source_event_id,
668        task_id,
669        dry_run,
670    )
671    .await
672}
673
674async fn execute_actions(
675    state: &AppState,
676    workflow_id: &str,
677    binding_id: Option<String>,
678    actions: Vec<PreparedWorkflowAction>,
679    workflow_name: Option<String>,
680    workflow_description: Option<String>,
681    source: Option<WorkflowSourceRef>,
682    trigger_event: Option<String>,
683    source_event_id: Option<String>,
684    task_id: Option<String>,
685    dry_run: bool,
686) -> anyhow::Result<WorkflowRunRecord> {
687    let run_id = format!("workflow-run-{}", Uuid::new_v4());
688    let now = now_ms();
689    let automation = compile_workflow_run_automation(
690        workflow_id,
691        workflow_name.as_deref(),
692        workflow_description.as_deref(),
693        binding_id.as_deref(),
694        &actions,
695        source.as_ref(),
696        trigger_event.as_deref(),
697    );
698    let automation = state.put_automation_v2(automation).await?;
699    let automation_run = state
700        .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
701        .await?;
702    let automation_run =
703        sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
704    let mut run = WorkflowRunRecord {
705        run_id: run_id.clone(),
706        workflow_id: workflow_id.to_string(),
707        automation_id: Some(automation.automation_id.clone()),
708        automation_run_id: Some(automation_run.run_id.clone()),
709        binding_id,
710        trigger_event: trigger_event.clone(),
711        source_event_id: source_event_id.clone(),
712        task_id: task_id.clone(),
713        status: if dry_run {
714            WorkflowRunStatus::DryRun
715        } else {
716            WorkflowRunStatus::Running
717        },
718        created_at_ms: now,
719        updated_at_ms: now,
720        finished_at_ms: if dry_run { Some(now) } else { None },
721        actions: actions
722            .iter()
723            .map(|action| WorkflowActionRunRecord {
724                action_id: action.action_id.clone(),
725                action: action.spec.action.clone(),
726                task_id: task_id.clone(),
727                status: if dry_run {
728                    WorkflowActionRunStatus::Skipped
729                } else {
730                    WorkflowActionRunStatus::Pending
731                },
732                detail: None,
733                output: None,
734                updated_at_ms: now,
735            })
736            .collect(),
737        source,
738    };
739    state.put_workflow_run(run.clone()).await?;
740    let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
741    state.event_bus.publish(EngineEvent::new(
742        "workflow.run.started",
743        json!({
744            "runID": run.run_id,
745            "workflowID": run.workflow_id,
746            "bindingID": run.binding_id,
747            "triggerEvent": trigger_event,
748            "sourceEventID": source_event_id,
749            "taskID": task_id,
750            "dryRun": dry_run,
751        }),
752    ));
753    if dry_run {
754        return Ok(run);
755    }
756    for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
757        action_row.status = WorkflowActionRunStatus::Running;
758        action_row.updated_at_ms = now_ms();
759        let action_name = action_row.action.clone();
760        let _ = sync_workflow_automation_action_started(
761            state,
762            &automation,
763            automation_run.run_id.as_str(),
764            &action_row.action_id,
765        )
766        .await;
767        state
768            .update_workflow_run(&run.run_id, |row| {
769                if let Some(target) = row
770                    .actions
771                    .iter_mut()
772                    .find(|item| item.action_id == action_row.action_id)
773                {
774                    *target = action_row.clone();
775                }
776            })
777            .await;
778        if let Some(latest) = state.get_workflow_run(&run.run_id).await {
779            let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
780        }
781        state.event_bus.publish(EngineEvent::new(
782            "workflow.action.started",
783            json!({
784                "runID": run.run_id,
785                "workflowID": run.workflow_id,
786                "actionID": action_row.action_id,
787                "action": action_name,
788                "taskID": run.task_id,
789            }),
790        ));
791        match execute_action(
792            state,
793            &run.run_id,
794            workflow_id,
795            &action_spec.spec,
796            action_row,
797            trigger_event.clone(),
798        )
799        .await
800        {
801            Ok(output) => {
802                action_row.status = WorkflowActionRunStatus::Completed;
803                action_row.output = Some(output.clone());
804                action_row.updated_at_ms = now_ms();
805                state
806                    .update_workflow_run(&run.run_id, |row| {
807                        if let Some(target) = row
808                            .actions
809                            .iter_mut()
810                            .find(|item| item.action_id == action_row.action_id)
811                        {
812                            *target = action_row.clone();
813                        }
814                    })
815                    .await;
816                let _ = sync_workflow_automation_action_completed(
817                    state,
818                    &automation,
819                    automation_run.run_id.as_str(),
820                    &action_row.action_id,
821                    &output,
822                )
823                .await;
824                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
825                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
826                }
827                state.event_bus.publish(EngineEvent::new(
828                    "workflow.action.completed",
829                    json!({
830                        "runID": run.run_id,
831                        "workflowID": run.workflow_id,
832                        "actionID": action_row.action_id,
833                        "action": action_name,
834                        "taskID": run.task_id,
835                        "output": output,
836                    }),
837                ));
838            }
839            Err(error) => {
840                let detail = error.to_string();
841                action_row.status = WorkflowActionRunStatus::Failed;
842                action_row.detail = Some(detail.clone());
843                action_row.updated_at_ms = now_ms();
844                run.status = WorkflowRunStatus::Failed;
845                run.finished_at_ms = Some(now_ms());
846                state
847                    .update_workflow_run(&run.run_id, |row| {
848                        row.status = WorkflowRunStatus::Failed;
849                        row.finished_at_ms = Some(now_ms());
850                        if let Some(target) = row
851                            .actions
852                            .iter_mut()
853                            .find(|item| item.action_id == action_row.action_id)
854                        {
855                            *target = action_row.clone();
856                        }
857                    })
858                    .await;
859                let _ = sync_workflow_automation_action_failed(
860                    state,
861                    &automation,
862                    automation_run.run_id.as_str(),
863                    &action_row.action_id,
864                    &detail,
865                )
866                .await;
867                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
868                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
869                }
870                state.event_bus.publish(EngineEvent::new(
871                    "workflow.action.failed",
872                    json!({
873                        "runID": run.run_id,
874                        "workflowID": run.workflow_id,
875                        "actionID": action_row.action_id,
876                        "action": action_name,
877                        "taskID": run.task_id,
878                        "error": detail,
879                    }),
880                ));
881                state.event_bus.publish(EngineEvent::new(
882                    "workflow.run.failed",
883                    json!({
884                        "runID": run.run_id,
885                        "workflowID": run.workflow_id,
886                        "actionID": action_row.action_id,
887                        "taskID": run.task_id,
888                        "error": action_row.detail,
889                    }),
890                ));
891                return state.get_workflow_run(&run.run_id).await.with_context(|| {
892                    format!("workflow run `{}` missing after failure", run.run_id)
893                });
894            }
895        }
896    }
897    run.status = WorkflowRunStatus::Completed;
898    run.finished_at_ms = Some(now_ms());
899    let final_run = state
900        .update_workflow_run(&run.run_id, |row| {
901            row.status = WorkflowRunStatus::Completed;
902            row.finished_at_ms = Some(now_ms());
903        })
904        .await
905        .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
906    let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
907    state.event_bus.publish(EngineEvent::new(
908        "workflow.run.completed",
909        json!({
910            "runID": final_run.run_id,
911            "workflowID": final_run.workflow_id,
912            "bindingID": final_run.binding_id,
913            "taskID": final_run.task_id,
914        }),
915    ));
916    Ok(final_run)
917}
918
919async fn execute_action(
920    state: &AppState,
921    run_id: &str,
922    workflow_id: &str,
923    action_spec: &WorkflowActionSpec,
924    action_row: &WorkflowActionRunRecord,
925    trigger_event: Option<String>,
926) -> anyhow::Result<Value> {
927    let action_name = action_spec.action.as_str();
928    let parsed = parse_workflow_action(action_name);
929    match parsed {
930        ParsedWorkflowAction::EventEmit { event_type } => {
931            let payload = action_payload(action_spec, action_row);
932            state.event_bus.publish(EngineEvent::new(
933                event_type.clone(),
934                json!({
935                    "workflowID": workflow_id,
936                    "actionID": action_row.action_id,
937                    "triggerEvent": trigger_event,
938                    "payload": payload,
939                }),
940            ));
941            Ok(json!({ "eventType": event_type }))
942        }
943        ParsedWorkflowAction::ResourcePut { key } => {
944            let record = state
945                .put_shared_resource(
946                    key.clone(),
947                    action_payload(action_spec, action_row),
948                    None,
949                    "workflow".to_string(),
950                    None,
951                )
952                .await
953                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
954            Ok(json!({ "key": record.key, "rev": record.rev }))
955        }
956        ParsedWorkflowAction::ResourcePatch { key } => {
957            let current = state.get_shared_resource(&key).await;
958            let next_rev = current.as_ref().map(|row| row.rev);
959            let record = state
960                .put_shared_resource(
961                    key.clone(),
962                    merge_object(
963                        current.map(|row| row.value).unwrap_or_else(|| json!({})),
964                        action_payload(action_spec, action_row),
965                    ),
966                    next_rev,
967                    "workflow".to_string(),
968                    None,
969                )
970                .await
971                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
972            Ok(json!({ "key": record.key, "rev": record.rev }))
973        }
974        ParsedWorkflowAction::ResourceDelete { key } => {
975            let deleted = state
976                .delete_shared_resource(&key, None)
977                .await
978                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
979            Ok(json!({ "key": key, "deleted": deleted.is_some() }))
980        }
981        ParsedWorkflowAction::Tool { tool_name } => {
982            let payload = action_payload(action_spec, action_row);
983            let result = state.tools.execute(&tool_name, payload.clone()).await?;
984            let mut response = json!({
985                "tool": tool_name,
986                "output": result.output,
987                "metadata": result.metadata,
988            });
989            if let Some(external_action) = record_workflow_external_action(
990                state,
991                run_id,
992                workflow_id,
993                action_row,
994                trigger_event.clone(),
995                WorkflowExternalActionExecution::Tool {
996                    tool_name: tool_name.clone(),
997                },
998                &payload,
999                &response,
1000            )
1001            .await?
1002            {
1003                if let Some(obj) = response.as_object_mut() {
1004                    obj.insert("external_action".to_string(), external_action);
1005                }
1006            }
1007            Ok(response)
1008        }
1009        ParsedWorkflowAction::Capability { capability_id } => {
1010            let bindings = state.capability_resolver.list_bindings().await?;
1011            let tool_name = bindings
1012                .bindings
1013                .iter()
1014                .find(|binding| binding.capability_id == capability_id)
1015                .map(|binding| binding.tool_name.clone())
1016                .unwrap_or_else(|| capability_id.clone());
1017            let payload = action_payload(action_spec, action_row);
1018            let result = state.tools.execute(&tool_name, payload.clone()).await?;
1019            let mut response = json!({
1020                "capability": capability_id,
1021                "tool": tool_name,
1022                "output": result.output,
1023                "metadata": result.metadata,
1024            });
1025            if let Some(external_action) = record_workflow_external_action(
1026                state,
1027                run_id,
1028                workflow_id,
1029                action_row,
1030                trigger_event.clone(),
1031                WorkflowExternalActionExecution::Capability {
1032                    capability_id: capability_id.clone(),
1033                    tool_name: tool_name.clone(),
1034                },
1035                &payload,
1036                &response,
1037            )
1038            .await?
1039            {
1040                if let Some(obj) = response.as_object_mut() {
1041                    obj.insert("external_action".to_string(), external_action);
1042                }
1043            }
1044            Ok(response)
1045        }
1046        ParsedWorkflowAction::Workflow { workflow_id } => {
1047            anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1048        }
1049        ParsedWorkflowAction::Agent { agent_id } => {
1050            let workspace_root = state.workspace_index.snapshot().await.root;
1051            let session = Session::new(
1052                Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1053                Some(workspace_root.clone()),
1054            );
1055            let session_id = session.id.clone();
1056            state.storage.save_session(session).await?;
1057            let prompt = action_spec
1058                .with
1059                .as_ref()
1060                .and_then(|v| v.get("prompt"))
1061                .and_then(|v| v.as_str())
1062                .map(ToString::to_string)
1063                .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1064            let request = SendMessageRequest {
1065                parts: vec![MessagePartInput::Text { text: prompt }],
1066                model: None,
1067                agent: Some(agent_id.clone()),
1068                tool_mode: None,
1069                tool_allowlist: None,
1070                context_mode: None,
1071                write_required: None,
1072                prewrite_requirements: None,
1073            };
1074            state
1075                .engine_loop
1076                .run_prompt_async_with_context(
1077                    session_id.clone(),
1078                    request,
1079                    Some(format!("workflow:{workflow_id}")),
1080                )
1081                .await?;
1082            Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1083        }
1084    }
1085}
1086
1087enum WorkflowExternalActionExecution {
1088    Tool {
1089        tool_name: String,
1090    },
1091    Capability {
1092        capability_id: String,
1093        tool_name: String,
1094    },
1095}
1096
1097async fn record_workflow_external_action(
1098    state: &AppState,
1099    run_id: &str,
1100    workflow_id: &str,
1101    action_row: &WorkflowActionRunRecord,
1102    trigger_event: Option<String>,
1103    execution: WorkflowExternalActionExecution,
1104    payload: &Value,
1105    result: &Value,
1106) -> anyhow::Result<Option<Value>> {
1107    let bindings = state.capability_resolver.list_bindings().await?;
1108    let binding = match execution {
1109        WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1110            .bindings
1111            .iter()
1112            .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1113        WorkflowExternalActionExecution::Capability {
1114            ref capability_id,
1115            ref tool_name,
1116        } => bindings.bindings.iter().find(|binding| {
1117            binding.capability_id == *capability_id
1118                && workflow_binding_matches_tool_name(binding, tool_name)
1119        }),
1120    };
1121    let Some(binding) = binding else {
1122        return Ok(None);
1123    };
1124
1125    let target = workflow_external_action_target(payload, result);
1126    let source_id = format!("{run_id}:{}", action_row.action_id);
1127    let idempotency_key = crate::sha256_hex(&[
1128        workflow_id,
1129        run_id,
1130        &action_row.action_id,
1131        &action_row.action,
1132        &payload.to_string(),
1133    ]);
1134    let action = crate::ExternalActionRecord {
1135        action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1136        operation: binding.capability_id.clone(),
1137        status: "posted".to_string(),
1138        source_kind: Some("workflow".to_string()),
1139        source_id: Some(source_id.clone()),
1140        routine_run_id: None,
1141        context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1142        capability_id: Some(binding.capability_id.clone()),
1143        provider: Some(binding.provider.clone()),
1144        target,
1145        approval_state: Some("executed".to_string()),
1146        idempotency_key: Some(idempotency_key),
1147        receipt: Some(result.clone()),
1148        error: None,
1149        metadata: Some(json!({
1150            "workflowID": workflow_id,
1151            "workflowRunID": run_id,
1152            "actionID": action_row.action_id,
1153            "action": action_row.action,
1154            "taskID": action_row.task_id,
1155            "triggerEvent": trigger_event,
1156            "tool": binding.tool_name,
1157            "provider": binding.provider,
1158            "input": payload,
1159        })),
1160        created_at_ms: action_row.updated_at_ms,
1161        updated_at_ms: action_row.updated_at_ms,
1162    };
1163    let recorded = state.record_external_action(action).await?;
1164    Ok(Some(serde_json::to_value(&recorded)?))
1165}
1166
1167fn workflow_binding_matches_tool_name(
1168    binding: &crate::capability_resolver::CapabilityBinding,
1169    tool_name: &str,
1170) -> bool {
1171    binding.tool_name.eq_ignore_ascii_case(tool_name)
1172        || binding
1173            .tool_name_aliases
1174            .iter()
1175            .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1176}
1177
1178fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1179    for candidate in [
1180        payload.pointer("/owner_repo").and_then(Value::as_str),
1181        payload.pointer("/repo").and_then(Value::as_str),
1182        payload.pointer("/repository").and_then(Value::as_str),
1183        payload.pointer("/channel").and_then(Value::as_str),
1184        payload.pointer("/channel_id").and_then(Value::as_str),
1185        payload.pointer("/thread_ts").and_then(Value::as_str),
1186        result.pointer("/metadata/channel").and_then(Value::as_str),
1187        result.pointer("/metadata/repo").and_then(Value::as_str),
1188    ] {
1189        let trimmed = candidate.map(str::trim).unwrap_or_default();
1190        if !trimmed.is_empty() {
1191            return Some(trimmed.to_string());
1192        }
1193    }
1194    None
1195}
1196
1197fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1198    action_spec
1199        .with
1200        .clone()
1201        .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1202}
1203
1204fn merge_object(current: Value, patch: Value) -> Value {
1205    if let (Some(mut current_obj), Some(patch_obj)) =
1206        (current.as_object().cloned(), patch.as_object())
1207    {
1208        for (key, value) in patch_obj {
1209            current_obj.insert(key.clone(), value.clone());
1210        }
1211        Value::Object(current_obj)
1212    } else {
1213        patch
1214    }
1215}
1216
1217fn source_event_id(event: &EngineEvent) -> String {
1218    if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1219        return id.to_string();
1220    }
1221    for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1222        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1223            return format!("{}:{id}", event.event_type);
1224        }
1225    }
1226    format!("{}:{}", event.event_type, event.properties)
1227}
1228
1229fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1230    for key in ["task_id", "taskID", "step_id", "stepID"] {
1231        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1232            let trimmed = id.trim();
1233            if !trimmed.is_empty() {
1234                return Some(trimmed.to_string());
1235            }
1236        }
1237    }
1238    None
1239}
1240
1241fn event_name_matches(expected: &str, actual: &str) -> bool {
1242    expected.trim().eq_ignore_ascii_case(actual.trim())
1243}