Skip to main content

tandem_server/
workflows.rs

1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session};
4use tandem_workflows::{
5    WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6    WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13struct PreparedWorkflowAction {
14    action_id: String,
15    spec: WorkflowActionSpec,
16}
17
18fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
19    match with {
20        Some(with) if !with.is_null() => {
21            format!("Execute workflow action `{action}` with payload {with}.")
22        }
23        _ => format!("Execute workflow action `{action}`."),
24    }
25}
26
27fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
28    crate::AutomationV2Schedule {
29        schedule_type: crate::AutomationV2ScheduleType::Manual,
30        cron_expression: None,
31        interval_seconds: None,
32        timezone: "UTC".to_string(),
33        misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
34    }
35}
36
37fn workflow_execution_plan(
38    workflow_id: &str,
39    name: &str,
40    description: Option<String>,
41    actions: &[PreparedWorkflowAction],
42    source_label: &str,
43    source: Option<&WorkflowSourceRef>,
44    trigger_event: Option<&str>,
45) -> crate::WorkflowPlan {
46    crate::WorkflowPlan {
47        plan_id: format!("workflow-plan-{workflow_id}"),
48        planner_version: "workflow_runtime_v1".to_string(),
49        plan_source: source_label.to_string(),
50        original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
51        normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
52        confidence: "high".to_string(),
53        title: name.to_string(),
54        description,
55        schedule: workflow_manual_schedule(),
56        execution_target: "automation_v2".to_string(),
57        workspace_root: std::env::current_dir()
58            .unwrap_or_else(|_| std::path::PathBuf::from("."))
59            .to_string_lossy()
60            .to_string(),
61        steps: actions
62            .iter()
63            .map(|action| crate::WorkflowPlanStep {
64                step_id: action.action_id.clone(),
65                kind: "workflow_action".to_string(),
66                objective: workflow_action_objective(
67                    &action.spec.action,
68                    action.spec.with.as_ref(),
69                ),
70                depends_on: Vec::new(),
71                agent_role: "operator".to_string(),
72                input_refs: Vec::new(),
73                output_contract: Some(crate::AutomationFlowOutputContract {
74                    kind: "generic_artifact".to_string(),
75                    validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
76                    enforcement: None,
77                    schema: None,
78                    summary_guidance: None,
79                }),
80                metadata: None,
81            })
82            .collect(),
83        requires_integrations: Vec::new(),
84        allowed_mcp_servers: Vec::new(),
85        operator_preferences: Some(json!({
86            "source": source_label,
87            "tool_access_mode": "auto",
88        })),
89        save_options: json!({
90            "origin": source_label,
91            "workflow_source": source,
92            "trigger_event": trigger_event,
93        }),
94    }
95}
96
97pub(crate) fn compile_workflow_spec_to_automation_preview(
98    workflow: &WorkflowSpec,
99) -> crate::AutomationV2Spec {
100    let actions = workflow
101        .steps
102        .iter()
103        .map(|step| PreparedWorkflowAction {
104            action_id: step.step_id.clone(),
105            spec: WorkflowActionSpec {
106                action: step.action.clone(),
107                with: step.with.clone(),
108            },
109        })
110        .collect::<Vec<_>>();
111    let mut automation = crate::http::compile_plan_to_automation_v2(
112        &workflow_execution_plan(
113            &workflow.workflow_id,
114            &workflow.name,
115            workflow.description.clone(),
116            &actions,
117            "workflow_registry",
118            workflow.source.as_ref(),
119            None,
120        ),
121        None,
122        "workflow_registry",
123    );
124    if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
125        metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
126        metadata.insert("workflow_name".to_string(), json!(workflow.name));
127        metadata.insert("workflow_source".to_string(), json!(workflow.source));
128        metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
129    }
130    automation
131}
132
133fn compile_workflow_run_automation(
134    workflow_id: &str,
135    workflow_name: Option<&str>,
136    workflow_description: Option<&str>,
137    binding_id: Option<&str>,
138    actions: &[PreparedWorkflowAction],
139    source: Option<&WorkflowSourceRef>,
140    trigger_event: Option<&str>,
141) -> crate::AutomationV2Spec {
142    let automation_id = binding_id
143        .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
144        .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
145    let title = binding_id
146        .map(|binding| {
147            workflow_name
148                .map(|name| format!("{name} hook {binding}"))
149                .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
150        })
151        .unwrap_or_else(|| {
152            workflow_name
153                .map(|name| format!("{name} execution"))
154                .unwrap_or_else(|| format!("Workflow {workflow_id}"))
155        });
156    let mut automation = crate::http::compile_plan_to_automation_v2(
157        &workflow_execution_plan(
158            &automation_id,
159            &title,
160            Some(
161                workflow_description
162                    .map(|description| description.to_string())
163                    .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
164            ),
165            actions,
166            "workflow_runtime",
167            source,
168            trigger_event,
169        ),
170        None,
171        "workflow_runtime",
172    );
173    automation.automation_id = automation_id.clone();
174    automation.name = title;
175    automation.metadata = Some(json!({
176        "workflow_id": workflow_id,
177        "binding_id": binding_id,
178        "workflow_source": source,
179        "trigger_event": trigger_event,
180        "origin": "workflow_runtime_mirror",
181    }));
182    automation
183}
184
185async fn sync_workflow_automation_run_start(
186    state: &AppState,
187    automation: &crate::AutomationV2Spec,
188    run_id: &str,
189) -> anyhow::Result<crate::AutomationV2RunRecord> {
190    let updated = state
191        .update_automation_v2_run(run_id, |run| {
192            run.status = crate::AutomationRunStatus::Running;
193            run.started_at_ms.get_or_insert_with(now_ms);
194            crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
195                run,
196                "workflow_run_started",
197                Some("workflow runtime mirror started".to_string()),
198                None,
199            );
200            crate::app::state::refresh_automation_runtime_state(automation, run);
201        })
202        .await
203        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
204    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
205        .await
206        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
207    Ok(updated)
208}
209
210async fn sync_workflow_automation_action_started(
211    state: &AppState,
212    automation: &crate::AutomationV2Spec,
213    run_id: &str,
214    action_id: &str,
215) -> anyhow::Result<crate::AutomationV2RunRecord> {
216    let updated = state
217        .update_automation_v2_run(run_id, |run| {
218            let next_attempt = run
219                .checkpoint
220                .node_attempts
221                .get(action_id)
222                .copied()
223                .unwrap_or(0)
224                .saturating_add(1);
225            run.checkpoint
226                .node_attempts
227                .insert(action_id.to_string(), next_attempt);
228            run.detail = Some(format!("Running workflow action `{action_id}`"));
229            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
230                run,
231                "workflow_action_started",
232                Some(format!("workflow action `{action_id}` started")),
233                None,
234                Some(json!({
235                    "action_id": action_id,
236                    "attempt": next_attempt,
237                })),
238            );
239            crate::app::state::refresh_automation_runtime_state(automation, run);
240        })
241        .await
242        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
243    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
244        .await
245        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
246    Ok(updated)
247}
248
249async fn sync_workflow_automation_action_completed(
250    state: &AppState,
251    automation: &crate::AutomationV2Spec,
252    run_id: &str,
253    action_id: &str,
254    output: &Value,
255) -> anyhow::Result<crate::AutomationV2RunRecord> {
256    let action_count = automation.flow.nodes.len();
257    let updated = state
258        .update_automation_v2_run(run_id, |run| {
259            run.checkpoint.pending_nodes.retain(|id| id != action_id);
260            if !run
261                .checkpoint
262                .completed_nodes
263                .iter()
264                .any(|id| id == action_id)
265            {
266                run.checkpoint.completed_nodes.push(action_id.to_string());
267            }
268            run.checkpoint.node_outputs.insert(
269                action_id.to_string(),
270                json!(crate::AutomationNodeOutput {
271                    contract_kind: "generic_artifact".to_string(),
272                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
273                    validator_summary: Some(crate::AutomationValidatorSummary {
274                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
275                        outcome: "accepted".to_string(),
276                        reason: Some("workflow action completed".to_string()),
277                        unmet_requirements: Vec::new(),
278                        warning_requirements: Vec::new(),
279                        warning_count: 0,
280                        accepted_candidate_source: Some("workflow_runtime".to_string()),
281                        verification_outcome: Some("not_applicable".to_string()),
282                        validation_basis: None,
283                        repair_attempted: false,
284                        repair_attempt: 0,
285                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
286                            as u32,
287                        repair_succeeded: false,
288                        repair_exhausted: false,
289                    }),
290                    summary: format!("Workflow action `{action_id}` completed"),
291                    content: json!({
292                        "action_id": action_id,
293                        "output": output,
294                    }),
295                    created_at_ms: now_ms(),
296                    node_id: action_id.to_string(),
297                    status: Some("completed".to_string()),
298                    blocked_reason: None,
299                    approved: None,
300                    workflow_class: Some("workflow_action".to_string()),
301                    phase: Some("execution".to_string()),
302                    failure_kind: None,
303                    tool_telemetry: None,
304                    preflight: None,
305                    capability_resolution: None,
306                    attempt_evidence: None,
307                    blocker_category: None,
308                    fallback_used: None,
309                    artifact_validation: None,
310                    receipt_timeline: None,
311                    quality_mode: Some("strict_research_v1".to_string()),
312                    requested_quality_mode: None,
313                    emergency_rollback_enabled: Some(false),
314                    provenance: Some(crate::AutomationNodeOutputProvenance {
315                        session_id: format!("workflow-runtime-{run_id}"),
316                        node_id: action_id.to_string(),
317                        run_id: Some(run_id.to_string()),
318                        output_path: None,
319                        content_digest: None,
320                        accepted_candidate_source: Some("workflow_runtime".to_string()),
321                        validation_outcome: Some("not_applicable".to_string()),
322                        repair_attempt: Some(0),
323                        repair_succeeded: Some(false),
324                        reuse_allowed: Some(false),
325                        freshness: crate::AutomationNodeOutputFreshness {
326                            current_run: true,
327                            current_attempt: true,
328                        },
329                    }),
330                }),
331            );
332            if run.checkpoint.completed_nodes.len() >= action_count {
333                run.status = crate::AutomationRunStatus::Completed;
334                run.detail = Some("workflow runtime mirror completed".to_string());
335            }
336            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
337                run,
338                "workflow_action_completed",
339                Some(format!("workflow action `{action_id}` completed")),
340                None,
341                Some(json!({
342                    "action_id": action_id,
343                })),
344            );
345            crate::app::state::refresh_automation_runtime_state(automation, run);
346        })
347        .await
348        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
349    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
350        .await
351        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
352    Ok(updated)
353}
354
355async fn sync_workflow_automation_action_failed(
356    state: &AppState,
357    automation: &crate::AutomationV2Spec,
358    run_id: &str,
359    action_id: &str,
360    error: &str,
361) -> anyhow::Result<crate::AutomationV2RunRecord> {
362    let updated = state
363        .update_automation_v2_run(run_id, |run| {
364            run.status = crate::AutomationRunStatus::Failed;
365            run.detail = Some(format!("Workflow action `{action_id}` failed"));
366            run.checkpoint.pending_nodes.retain(|id| id != action_id);
367            run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
368                node_id: action_id.to_string(),
369                reason: error.to_string(),
370                failed_at_ms: now_ms(),
371            });
372            run.checkpoint.node_outputs.insert(
373                action_id.to_string(),
374                json!(crate::AutomationNodeOutput {
375                    contract_kind: "generic_artifact".to_string(),
376                    validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
377                    validator_summary: Some(crate::AutomationValidatorSummary {
378                        kind: crate::AutomationOutputValidatorKind::GenericArtifact,
379                        outcome: "rejected".to_string(),
380                        reason: Some(error.to_string()),
381                        unmet_requirements: vec![error.to_string()],
382                        warning_requirements: Vec::new(),
383                        warning_count: 0,
384                        accepted_candidate_source: None,
385                        verification_outcome: Some("failed".to_string()),
386                        validation_basis: None,
387                        repair_attempted: false,
388                        repair_attempt: 0,
389                        repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
390                            as u32,
391                        repair_succeeded: false,
392                        repair_exhausted: false,
393                    }),
394                    summary: format!("Workflow action `{action_id}` failed"),
395                    content: json!({
396                        "action_id": action_id,
397                        "error": error,
398                    }),
399                    created_at_ms: now_ms(),
400                    node_id: action_id.to_string(),
401                    status: Some("failed".to_string()),
402                    blocked_reason: Some(error.to_string()),
403                    approved: None,
404                    workflow_class: Some("workflow_action".to_string()),
405                    phase: Some("execution".to_string()),
406                    failure_kind: Some("workflow_action_failed".to_string()),
407                    tool_telemetry: None,
408                    preflight: None,
409                    capability_resolution: None,
410                    attempt_evidence: None,
411                    blocker_category: Some("tool_result_unusable".to_string()),
412                    fallback_used: None,
413                    artifact_validation: None,
414                    receipt_timeline: None,
415                    quality_mode: Some("strict_research_v1".to_string()),
416                    requested_quality_mode: None,
417                    emergency_rollback_enabled: Some(false),
418                    provenance: Some(crate::AutomationNodeOutputProvenance {
419                        session_id: format!("workflow-runtime-{run_id}"),
420                        node_id: action_id.to_string(),
421                        run_id: Some(run_id.to_string()),
422                        output_path: None,
423                        content_digest: None,
424                        accepted_candidate_source: None,
425                        validation_outcome: Some("failed".to_string()),
426                        repair_attempt: Some(0),
427                        repair_succeeded: Some(false),
428                        reuse_allowed: Some(false),
429                        freshness: crate::AutomationNodeOutputFreshness {
430                            current_run: true,
431                            current_attempt: true,
432                        },
433                    }),
434                }),
435            );
436            crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
437                run,
438                "workflow_action_failed",
439                Some(format!("workflow action `{action_id}` failed")),
440                None,
441                Some(json!({
442                    "action_id": action_id,
443                    "error": error,
444                })),
445            );
446            crate::app::state::refresh_automation_runtime_state(automation, run);
447        })
448        .await
449        .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
450    crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
451        .await
452        .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
453    Ok(updated)
454}
455
456#[derive(Debug, Clone)]
457pub enum ParsedWorkflowAction {
458    EventEmit { event_type: String },
459    ResourcePut { key: String },
460    ResourcePatch { key: String },
461    ResourceDelete { key: String },
462    Tool { tool_name: String },
463    Capability { capability_id: String },
464    Workflow { workflow_id: String },
465    Agent { agent_id: String },
466}
467
468pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
469    let trimmed = action.trim();
470    if let Some(rest) = trimmed.strip_prefix("event:") {
471        return ParsedWorkflowAction::EventEmit {
472            event_type: rest.trim().to_string(),
473        };
474    }
475    if let Some(rest) = trimmed.strip_prefix("resource:put:") {
476        return ParsedWorkflowAction::ResourcePut {
477            key: rest.trim().to_string(),
478        };
479    }
480    if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
481        return ParsedWorkflowAction::ResourcePatch {
482            key: rest.trim().to_string(),
483        };
484    }
485    if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
486        return ParsedWorkflowAction::ResourceDelete {
487            key: rest.trim().to_string(),
488        };
489    }
490    if let Some(rest) = trimmed.strip_prefix("tool:") {
491        return ParsedWorkflowAction::Tool {
492            tool_name: rest.trim().to_string(),
493        };
494    }
495    if let Some(rest) = trimmed.strip_prefix("capability:") {
496        return ParsedWorkflowAction::Capability {
497            capability_id: rest.trim().to_string(),
498        };
499    }
500    if let Some(rest) = trimmed.strip_prefix("workflow:") {
501        return ParsedWorkflowAction::Workflow {
502            workflow_id: rest.trim().to_string(),
503        };
504    }
505    if let Some(rest) = trimmed.strip_prefix("agent:") {
506        return ParsedWorkflowAction::Agent {
507            agent_id: rest.trim().to_string(),
508        };
509    }
510    ParsedWorkflowAction::Capability {
511        capability_id: trimmed.to_string(),
512    }
513}
514
515pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
516    let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
517    match event.event_type.as_str() {
518        "context.task.created" => names.push("task_created".to_string()),
519        "context.task.started" => names.push("task_started".to_string()),
520        "context.task.completed" => names.push("task_completed".to_string()),
521        "context.task.failed" => names.push("task_failed".to_string()),
522        "workflow.run.started" | "routine.run.created" => {
523            names.push("workflow_started".to_string())
524        }
525        "workflow.run.completed" | "routine.run.completed" => {
526            names.push("workflow_completed".to_string())
527        }
528        "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
529        _ => {}
530    }
531    names.sort();
532    names.dedup();
533    names
534}
535
536pub async fn simulate_workflow_event(
537    state: &AppState,
538    event: &EngineEvent,
539) -> WorkflowSimulationResult {
540    let registry = state.workflow_registry().await;
541    let canonical = canonical_workflow_event_names(event);
542    let matched_bindings = registry
543        .hooks
544        .into_iter()
545        .filter(|hook| {
546            hook.enabled
547                && canonical
548                    .iter()
549                    .any(|name| event_name_matches(&hook.event, name))
550        })
551        .collect::<Vec<_>>();
552    let planned_actions = matched_bindings
553        .iter()
554        .flat_map(|hook| hook.actions.clone())
555        .collect::<Vec<_>>();
556    WorkflowSimulationResult {
557        matched_bindings,
558        planned_actions,
559        canonical_events: canonical,
560    }
561}
562
563pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
564    let simulation = simulate_workflow_event(state, event).await;
565    if simulation.matched_bindings.is_empty() {
566        return;
567    }
568    for hook in simulation.matched_bindings {
569        let source_event_id = source_event_id(event);
570        let task_id = task_id_from_event(event);
571        let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
572        {
573            let mut seen = state.workflow_dispatch_seen.write().await;
574            if seen.contains_key(&dedupe_key) {
575                continue;
576            }
577            seen.insert(dedupe_key, now_ms());
578        }
579        let _ = execute_hook_binding(
580            state,
581            &hook,
582            Some(event.event_type.clone()),
583            Some(source_event_id),
584            task_id,
585            false,
586        )
587        .await;
588    }
589}
590
591pub async fn run_workflow_dispatcher(state: AppState) {
592    let mut rx = state.event_bus.subscribe();
593    loop {
594        match rx.recv().await {
595            Ok(event) => dispatch_workflow_event(&state, &event).await,
596            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
597            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
598        }
599    }
600}
601
602pub async fn execute_workflow(
603    state: &AppState,
604    workflow: &WorkflowSpec,
605    trigger_event: Option<String>,
606    source_event_id: Option<String>,
607    task_id: Option<String>,
608    dry_run: bool,
609) -> anyhow::Result<WorkflowRunRecord> {
610    let actions = workflow
611        .steps
612        .iter()
613        .map(|step| PreparedWorkflowAction {
614            action_id: step.step_id.clone(),
615            spec: WorkflowActionSpec {
616                action: step.action.clone(),
617                with: step.with.clone(),
618            },
619        })
620        .collect::<Vec<_>>();
621    execute_actions(
622        state,
623        &workflow.workflow_id,
624        None,
625        actions,
626        Some(workflow.name.clone()),
627        workflow.description.clone(),
628        workflow.source.clone(),
629        trigger_event,
630        source_event_id,
631        task_id,
632        dry_run,
633    )
634    .await
635}
636
637pub async fn execute_hook_binding(
638    state: &AppState,
639    hook: &WorkflowHookBinding,
640    trigger_event: Option<String>,
641    source_event_id: Option<String>,
642    task_id: Option<String>,
643    dry_run: bool,
644) -> anyhow::Result<WorkflowRunRecord> {
645    let workflow = state
646        .get_workflow(&hook.workflow_id)
647        .await
648        .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
649    execute_actions(
650        state,
651        &hook.workflow_id,
652        Some(hook.binding_id.clone()),
653        hook.actions
654            .iter()
655            .enumerate()
656            .map(|(idx, action)| PreparedWorkflowAction {
657                action_id: format!("action_{}", idx + 1),
658                spec: action.clone(),
659            })
660            .collect(),
661        None,
662        None,
663        workflow.source,
664        trigger_event,
665        source_event_id,
666        task_id,
667        dry_run,
668    )
669    .await
670}
671
672async fn execute_actions(
673    state: &AppState,
674    workflow_id: &str,
675    binding_id: Option<String>,
676    actions: Vec<PreparedWorkflowAction>,
677    workflow_name: Option<String>,
678    workflow_description: Option<String>,
679    source: Option<WorkflowSourceRef>,
680    trigger_event: Option<String>,
681    source_event_id: Option<String>,
682    task_id: Option<String>,
683    dry_run: bool,
684) -> anyhow::Result<WorkflowRunRecord> {
685    let run_id = format!("workflow-run-{}", Uuid::new_v4());
686    let now = now_ms();
687    let automation = compile_workflow_run_automation(
688        workflow_id,
689        workflow_name.as_deref(),
690        workflow_description.as_deref(),
691        binding_id.as_deref(),
692        &actions,
693        source.as_ref(),
694        trigger_event.as_deref(),
695    );
696    let automation = state.put_automation_v2(automation).await?;
697    let automation_run = state
698        .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
699        .await?;
700    let automation_run =
701        sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
702    let mut run = WorkflowRunRecord {
703        run_id: run_id.clone(),
704        workflow_id: workflow_id.to_string(),
705        automation_id: Some(automation.automation_id.clone()),
706        automation_run_id: Some(automation_run.run_id.clone()),
707        binding_id,
708        trigger_event: trigger_event.clone(),
709        source_event_id: source_event_id.clone(),
710        task_id: task_id.clone(),
711        status: if dry_run {
712            WorkflowRunStatus::DryRun
713        } else {
714            WorkflowRunStatus::Running
715        },
716        created_at_ms: now,
717        updated_at_ms: now,
718        finished_at_ms: if dry_run { Some(now) } else { None },
719        actions: actions
720            .iter()
721            .map(|action| WorkflowActionRunRecord {
722                action_id: action.action_id.clone(),
723                action: action.spec.action.clone(),
724                task_id: task_id.clone(),
725                status: if dry_run {
726                    WorkflowActionRunStatus::Skipped
727                } else {
728                    WorkflowActionRunStatus::Pending
729                },
730                detail: None,
731                output: None,
732                updated_at_ms: now,
733            })
734            .collect(),
735        source,
736    };
737    state.put_workflow_run(run.clone()).await?;
738    let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
739    state.event_bus.publish(EngineEvent::new(
740        "workflow.run.started",
741        json!({
742            "runID": run.run_id,
743            "workflowID": run.workflow_id,
744            "bindingID": run.binding_id,
745            "triggerEvent": trigger_event,
746            "sourceEventID": source_event_id,
747            "taskID": task_id,
748            "dryRun": dry_run,
749        }),
750    ));
751    if dry_run {
752        return Ok(run);
753    }
754    for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
755        action_row.status = WorkflowActionRunStatus::Running;
756        action_row.updated_at_ms = now_ms();
757        let action_name = action_row.action.clone();
758        let _ = sync_workflow_automation_action_started(
759            state,
760            &automation,
761            automation_run.run_id.as_str(),
762            &action_row.action_id,
763        )
764        .await;
765        state
766            .update_workflow_run(&run.run_id, |row| {
767                if let Some(target) = row
768                    .actions
769                    .iter_mut()
770                    .find(|item| item.action_id == action_row.action_id)
771                {
772                    *target = action_row.clone();
773                }
774            })
775            .await;
776        if let Some(latest) = state.get_workflow_run(&run.run_id).await {
777            let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
778        }
779        state.event_bus.publish(EngineEvent::new(
780            "workflow.action.started",
781            json!({
782                "runID": run.run_id,
783                "workflowID": run.workflow_id,
784                "actionID": action_row.action_id,
785                "action": action_name,
786                "taskID": run.task_id,
787            }),
788        ));
789        match execute_action(
790            state,
791            &run.run_id,
792            workflow_id,
793            &action_spec.spec,
794            action_row,
795            trigger_event.clone(),
796        )
797        .await
798        {
799            Ok(output) => {
800                action_row.status = WorkflowActionRunStatus::Completed;
801                action_row.output = Some(output.clone());
802                action_row.updated_at_ms = now_ms();
803                state
804                    .update_workflow_run(&run.run_id, |row| {
805                        if let Some(target) = row
806                            .actions
807                            .iter_mut()
808                            .find(|item| item.action_id == action_row.action_id)
809                        {
810                            *target = action_row.clone();
811                        }
812                    })
813                    .await;
814                let _ = sync_workflow_automation_action_completed(
815                    state,
816                    &automation,
817                    automation_run.run_id.as_str(),
818                    &action_row.action_id,
819                    &output,
820                )
821                .await;
822                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
823                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
824                }
825                state.event_bus.publish(EngineEvent::new(
826                    "workflow.action.completed",
827                    json!({
828                        "runID": run.run_id,
829                        "workflowID": run.workflow_id,
830                        "actionID": action_row.action_id,
831                        "action": action_name,
832                        "taskID": run.task_id,
833                        "output": output,
834                    }),
835                ));
836            }
837            Err(error) => {
838                let detail = error.to_string();
839                action_row.status = WorkflowActionRunStatus::Failed;
840                action_row.detail = Some(detail.clone());
841                action_row.updated_at_ms = now_ms();
842                run.status = WorkflowRunStatus::Failed;
843                run.finished_at_ms = Some(now_ms());
844                state
845                    .update_workflow_run(&run.run_id, |row| {
846                        row.status = WorkflowRunStatus::Failed;
847                        row.finished_at_ms = Some(now_ms());
848                        if let Some(target) = row
849                            .actions
850                            .iter_mut()
851                            .find(|item| item.action_id == action_row.action_id)
852                        {
853                            *target = action_row.clone();
854                        }
855                    })
856                    .await;
857                let _ = sync_workflow_automation_action_failed(
858                    state,
859                    &automation,
860                    automation_run.run_id.as_str(),
861                    &action_row.action_id,
862                    &detail,
863                )
864                .await;
865                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
866                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
867                }
868                state.event_bus.publish(EngineEvent::new(
869                    "workflow.action.failed",
870                    json!({
871                        "runID": run.run_id,
872                        "workflowID": run.workflow_id,
873                        "actionID": action_row.action_id,
874                        "action": action_name,
875                        "taskID": run.task_id,
876                        "error": detail,
877                    }),
878                ));
879                state.event_bus.publish(EngineEvent::new(
880                    "workflow.run.failed",
881                    json!({
882                        "runID": run.run_id,
883                        "workflowID": run.workflow_id,
884                        "actionID": action_row.action_id,
885                        "taskID": run.task_id,
886                        "error": action_row.detail,
887                    }),
888                ));
889                return state.get_workflow_run(&run.run_id).await.with_context(|| {
890                    format!("workflow run `{}` missing after failure", run.run_id)
891                });
892            }
893        }
894    }
895    run.status = WorkflowRunStatus::Completed;
896    run.finished_at_ms = Some(now_ms());
897    let final_run = state
898        .update_workflow_run(&run.run_id, |row| {
899            row.status = WorkflowRunStatus::Completed;
900            row.finished_at_ms = Some(now_ms());
901        })
902        .await
903        .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
904    let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
905    state.event_bus.publish(EngineEvent::new(
906        "workflow.run.completed",
907        json!({
908            "runID": final_run.run_id,
909            "workflowID": final_run.workflow_id,
910            "bindingID": final_run.binding_id,
911            "taskID": final_run.task_id,
912        }),
913    ));
914    Ok(final_run)
915}
916
917async fn execute_action(
918    state: &AppState,
919    run_id: &str,
920    workflow_id: &str,
921    action_spec: &WorkflowActionSpec,
922    action_row: &WorkflowActionRunRecord,
923    trigger_event: Option<String>,
924) -> anyhow::Result<Value> {
925    let action_name = action_spec.action.as_str();
926    let parsed = parse_workflow_action(action_name);
927    match parsed {
928        ParsedWorkflowAction::EventEmit { event_type } => {
929            let payload = action_payload(action_spec, action_row);
930            state.event_bus.publish(EngineEvent::new(
931                event_type.clone(),
932                json!({
933                    "workflowID": workflow_id,
934                    "actionID": action_row.action_id,
935                    "triggerEvent": trigger_event,
936                    "payload": payload,
937                }),
938            ));
939            Ok(json!({ "eventType": event_type }))
940        }
941        ParsedWorkflowAction::ResourcePut { key } => {
942            let record = state
943                .put_shared_resource(
944                    key.clone(),
945                    action_payload(action_spec, action_row),
946                    None,
947                    "workflow".to_string(),
948                    None,
949                )
950                .await
951                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
952            Ok(json!({ "key": record.key, "rev": record.rev }))
953        }
954        ParsedWorkflowAction::ResourcePatch { key } => {
955            let current = state.get_shared_resource(&key).await;
956            let next_rev = current.as_ref().map(|row| row.rev);
957            let record = state
958                .put_shared_resource(
959                    key.clone(),
960                    merge_object(
961                        current.map(|row| row.value).unwrap_or_else(|| json!({})),
962                        action_payload(action_spec, action_row),
963                    ),
964                    next_rev,
965                    "workflow".to_string(),
966                    None,
967                )
968                .await
969                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
970            Ok(json!({ "key": record.key, "rev": record.rev }))
971        }
972        ParsedWorkflowAction::ResourceDelete { key } => {
973            let deleted = state
974                .delete_shared_resource(&key, None)
975                .await
976                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
977            Ok(json!({ "key": key, "deleted": deleted.is_some() }))
978        }
979        ParsedWorkflowAction::Tool { tool_name } => {
980            let payload = action_payload(action_spec, action_row);
981            let result = state.tools.execute(&tool_name, payload.clone()).await?;
982            let mut response = json!({
983                "tool": tool_name,
984                "output": result.output,
985                "metadata": result.metadata,
986            });
987            if let Some(external_action) = record_workflow_external_action(
988                state,
989                run_id,
990                workflow_id,
991                action_row,
992                trigger_event.clone(),
993                WorkflowExternalActionExecution::Tool {
994                    tool_name: tool_name.clone(),
995                },
996                &payload,
997                &response,
998            )
999            .await?
1000            {
1001                if let Some(obj) = response.as_object_mut() {
1002                    obj.insert("external_action".to_string(), external_action);
1003                }
1004            }
1005            Ok(response)
1006        }
1007        ParsedWorkflowAction::Capability { capability_id } => {
1008            let bindings = state.capability_resolver.list_bindings().await?;
1009            let tool_name = bindings
1010                .bindings
1011                .iter()
1012                .find(|binding| binding.capability_id == capability_id)
1013                .map(|binding| binding.tool_name.clone())
1014                .unwrap_or_else(|| capability_id.clone());
1015            let payload = action_payload(action_spec, action_row);
1016            let result = state.tools.execute(&tool_name, payload.clone()).await?;
1017            let mut response = json!({
1018                "capability": capability_id,
1019                "tool": tool_name,
1020                "output": result.output,
1021                "metadata": result.metadata,
1022            });
1023            if let Some(external_action) = record_workflow_external_action(
1024                state,
1025                run_id,
1026                workflow_id,
1027                action_row,
1028                trigger_event.clone(),
1029                WorkflowExternalActionExecution::Capability {
1030                    capability_id: capability_id.clone(),
1031                    tool_name: tool_name.clone(),
1032                },
1033                &payload,
1034                &response,
1035            )
1036            .await?
1037            {
1038                if let Some(obj) = response.as_object_mut() {
1039                    obj.insert("external_action".to_string(), external_action);
1040                }
1041            }
1042            Ok(response)
1043        }
1044        ParsedWorkflowAction::Workflow { workflow_id } => {
1045            anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1046        }
1047        ParsedWorkflowAction::Agent { agent_id } => {
1048            let workspace_root = state.workspace_index.snapshot().await.root;
1049            let session = Session::new(
1050                Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1051                Some(workspace_root.clone()),
1052            );
1053            let session_id = session.id.clone();
1054            state.storage.save_session(session).await?;
1055            let prompt = action_spec
1056                .with
1057                .as_ref()
1058                .and_then(|v| v.get("prompt"))
1059                .and_then(|v| v.as_str())
1060                .map(ToString::to_string)
1061                .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1062            let request = SendMessageRequest {
1063                parts: vec![MessagePartInput::Text { text: prompt }],
1064                model: None,
1065                agent: Some(agent_id.clone()),
1066                tool_mode: None,
1067                tool_allowlist: None,
1068                context_mode: None,
1069                write_required: None,
1070                prewrite_requirements: None,
1071            };
1072            state
1073                .engine_loop
1074                .run_prompt_async_with_context(
1075                    session_id.clone(),
1076                    request,
1077                    Some(format!("workflow:{workflow_id}")),
1078                )
1079                .await?;
1080            Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1081        }
1082    }
1083}
1084
1085enum WorkflowExternalActionExecution {
1086    Tool {
1087        tool_name: String,
1088    },
1089    Capability {
1090        capability_id: String,
1091        tool_name: String,
1092    },
1093}
1094
1095async fn record_workflow_external_action(
1096    state: &AppState,
1097    run_id: &str,
1098    workflow_id: &str,
1099    action_row: &WorkflowActionRunRecord,
1100    trigger_event: Option<String>,
1101    execution: WorkflowExternalActionExecution,
1102    payload: &Value,
1103    result: &Value,
1104) -> anyhow::Result<Option<Value>> {
1105    let bindings = state.capability_resolver.list_bindings().await?;
1106    let binding = match execution {
1107        WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1108            .bindings
1109            .iter()
1110            .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1111        WorkflowExternalActionExecution::Capability {
1112            ref capability_id,
1113            ref tool_name,
1114        } => bindings.bindings.iter().find(|binding| {
1115            binding.capability_id == *capability_id
1116                && workflow_binding_matches_tool_name(binding, tool_name)
1117        }),
1118    };
1119    let Some(binding) = binding else {
1120        return Ok(None);
1121    };
1122
1123    let target = workflow_external_action_target(payload, result);
1124    let source_id = format!("{run_id}:{}", action_row.action_id);
1125    let idempotency_key = crate::sha256_hex(&[
1126        workflow_id,
1127        run_id,
1128        &action_row.action_id,
1129        &action_row.action,
1130        &payload.to_string(),
1131    ]);
1132    let action = crate::ExternalActionRecord {
1133        action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1134        operation: binding.capability_id.clone(),
1135        status: "posted".to_string(),
1136        source_kind: Some("workflow".to_string()),
1137        source_id: Some(source_id.clone()),
1138        routine_run_id: None,
1139        context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1140        capability_id: Some(binding.capability_id.clone()),
1141        provider: Some(binding.provider.clone()),
1142        target,
1143        approval_state: Some("executed".to_string()),
1144        idempotency_key: Some(idempotency_key),
1145        receipt: Some(result.clone()),
1146        error: None,
1147        metadata: Some(json!({
1148            "workflowID": workflow_id,
1149            "workflowRunID": run_id,
1150            "actionID": action_row.action_id,
1151            "action": action_row.action,
1152            "taskID": action_row.task_id,
1153            "triggerEvent": trigger_event,
1154            "tool": binding.tool_name,
1155            "provider": binding.provider,
1156            "input": payload,
1157        })),
1158        created_at_ms: action_row.updated_at_ms,
1159        updated_at_ms: action_row.updated_at_ms,
1160    };
1161    let recorded = state.record_external_action(action).await?;
1162    Ok(Some(serde_json::to_value(&recorded)?))
1163}
1164
1165fn workflow_binding_matches_tool_name(
1166    binding: &crate::capability_resolver::CapabilityBinding,
1167    tool_name: &str,
1168) -> bool {
1169    binding.tool_name.eq_ignore_ascii_case(tool_name)
1170        || binding
1171            .tool_name_aliases
1172            .iter()
1173            .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1174}
1175
1176fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1177    for candidate in [
1178        payload.pointer("/owner_repo").and_then(Value::as_str),
1179        payload.pointer("/repo").and_then(Value::as_str),
1180        payload.pointer("/repository").and_then(Value::as_str),
1181        payload.pointer("/channel").and_then(Value::as_str),
1182        payload.pointer("/channel_id").and_then(Value::as_str),
1183        payload.pointer("/thread_ts").and_then(Value::as_str),
1184        result.pointer("/metadata/channel").and_then(Value::as_str),
1185        result.pointer("/metadata/repo").and_then(Value::as_str),
1186    ] {
1187        let trimmed = candidate.map(str::trim).unwrap_or_default();
1188        if !trimmed.is_empty() {
1189            return Some(trimmed.to_string());
1190        }
1191    }
1192    None
1193}
1194
1195fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1196    action_spec
1197        .with
1198        .clone()
1199        .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1200}
1201
1202fn merge_object(current: Value, patch: Value) -> Value {
1203    if let (Some(mut current_obj), Some(patch_obj)) =
1204        (current.as_object().cloned(), patch.as_object())
1205    {
1206        for (key, value) in patch_obj {
1207            current_obj.insert(key.clone(), value.clone());
1208        }
1209        Value::Object(current_obj)
1210    } else {
1211        patch
1212    }
1213}
1214
1215fn source_event_id(event: &EngineEvent) -> String {
1216    if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1217        return id.to_string();
1218    }
1219    for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1220        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1221            return format!("{}:{id}", event.event_type);
1222        }
1223    }
1224    format!("{}:{}", event.event_type, event.properties)
1225}
1226
1227fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1228    for key in ["task_id", "taskID", "step_id", "stepID"] {
1229        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1230            let trimmed = id.trim();
1231            if !trimmed.is_empty() {
1232                return Some(trimmed.to_string());
1233            }
1234        }
1235    }
1236    None
1237}
1238
1239fn event_name_matches(expected: &str, actual: &str) -> bool {
1240    expected.trim().eq_ignore_ascii_case(actual.trim())
1241}