1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session, TenantContext};
4use tandem_workflows::{
5 WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6 WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13struct PreparedWorkflowAction {
14 action_id: String,
15 spec: WorkflowActionSpec,
16}
17
18fn workflow_event_tenant_context_value(tenant_context: &TenantContext) -> Value {
19 serde_json::to_value(tenant_context).unwrap_or_else(|_| json!(tenant_context))
20}
21
22fn workflow_event_with_tenant_context(mut payload: Value, tenant_context: &TenantContext) -> Value {
23 if let Some(map) = payload.as_object_mut() {
24 map.entry("tenantContext".to_string())
25 .or_insert_with(|| workflow_event_tenant_context_value(tenant_context));
26 }
27 payload
28}
29
30fn event_tenant_context(event: &EngineEvent) -> TenantContext {
31 event
32 .properties
33 .get("tenantContext")
34 .and_then(|value| serde_json::from_value(value.clone()).ok())
35 .unwrap_or_else(TenantContext::local_implicit)
36}
37
38fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
39 match with {
40 Some(with) if !with.is_null() => {
41 format!("Execute workflow action `{action}` with payload {with}.")
42 }
43 _ => format!("Execute workflow action `{action}`."),
44 }
45}
46
47fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
48 crate::AutomationV2Schedule {
49 schedule_type: crate::AutomationV2ScheduleType::Manual,
50 cron_expression: None,
51 interval_seconds: None,
52 timezone: "UTC".to_string(),
53 misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
54 }
55}
56
57fn workflow_execution_plan(
58 workflow_id: &str,
59 name: &str,
60 description: Option<String>,
61 actions: &[PreparedWorkflowAction],
62 source_label: &str,
63 source: Option<&WorkflowSourceRef>,
64 trigger_event: Option<&str>,
65) -> crate::WorkflowPlan {
66 crate::WorkflowPlan {
67 plan_id: format!("workflow-plan-{workflow_id}"),
68 planner_version: "workflow_runtime_v1".to_string(),
69 plan_source: source_label.to_string(),
70 original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
71 normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
72 confidence: "high".to_string(),
73 title: name.to_string(),
74 description,
75 schedule: workflow_manual_schedule(),
76 execution_target: "automation_v2".to_string(),
77 workspace_root: std::env::current_dir()
78 .unwrap_or_else(|_| std::path::PathBuf::from("."))
79 .to_string_lossy()
80 .to_string(),
81 steps: actions
82 .iter()
83 .map(|action| crate::WorkflowPlanStep {
84 step_id: action.action_id.clone(),
85 kind: "workflow_action".to_string(),
86 objective: workflow_action_objective(
87 &action.spec.action,
88 action.spec.with.as_ref(),
89 ),
90 depends_on: Vec::new(),
91 agent_role: "operator".to_string(),
92 input_refs: Vec::new(),
93 output_contract: Some(crate::AutomationFlowOutputContract {
94 kind: "generic_artifact".to_string(),
95 validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
96 enforcement: None,
97 schema: None,
98 summary_guidance: None,
99 }),
100 metadata: None,
101 })
102 .collect(),
103 requires_integrations: Vec::new(),
104 allowed_mcp_servers: Vec::new(),
105 operator_preferences: Some(json!({
106 "source": source_label,
107 "tool_access_mode": "auto",
108 })),
109 save_options: json!({
110 "origin": source_label,
111 "workflow_source": source,
112 "trigger_event": trigger_event,
113 }),
114 }
115}
116
117pub(crate) fn compile_workflow_spec_to_automation_preview(
118 workflow: &WorkflowSpec,
119) -> crate::AutomationV2Spec {
120 let actions = workflow
121 .steps
122 .iter()
123 .map(|step| PreparedWorkflowAction {
124 action_id: step.step_id.clone(),
125 spec: WorkflowActionSpec {
126 action: step.action.clone(),
127 with: step.with.clone(),
128 },
129 })
130 .collect::<Vec<_>>();
131 let mut automation = crate::http::compile_plan_to_automation_v2(
132 &workflow_execution_plan(
133 &workflow.workflow_id,
134 &workflow.name,
135 workflow.description.clone(),
136 &actions,
137 "workflow_registry",
138 workflow.source.as_ref(),
139 None,
140 ),
141 None,
142 "workflow_registry",
143 );
144 if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
145 metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
146 metadata.insert("workflow_name".to_string(), json!(workflow.name));
147 metadata.insert("workflow_source".to_string(), json!(workflow.source));
148 metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
149 }
150 automation
151}
152
153fn compile_workflow_run_automation(
154 workflow_id: &str,
155 workflow_name: Option<&str>,
156 workflow_description: Option<&str>,
157 binding_id: Option<&str>,
158 actions: &[PreparedWorkflowAction],
159 source: Option<&WorkflowSourceRef>,
160 trigger_event: Option<&str>,
161) -> crate::AutomationV2Spec {
162 let automation_id = binding_id
163 .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
164 .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
165 let title = binding_id
166 .map(|binding| {
167 workflow_name
168 .map(|name| format!("{name} hook {binding}"))
169 .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
170 })
171 .unwrap_or_else(|| {
172 workflow_name
173 .map(|name| format!("{name} execution"))
174 .unwrap_or_else(|| format!("Workflow {workflow_id}"))
175 });
176 let mut automation = crate::http::compile_plan_to_automation_v2(
177 &workflow_execution_plan(
178 &automation_id,
179 &title,
180 Some(
181 workflow_description
182 .map(|description| description.to_string())
183 .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
184 ),
185 actions,
186 "workflow_runtime",
187 source,
188 trigger_event,
189 ),
190 None,
191 "workflow_runtime",
192 );
193 automation.automation_id = automation_id.clone();
194 automation.name = title;
195 automation.metadata = Some(json!({
196 "workflow_id": workflow_id,
197 "binding_id": binding_id,
198 "workflow_source": source,
199 "trigger_event": trigger_event,
200 "origin": "workflow_runtime_mirror",
201 }));
202 automation
203}
204
205async fn sync_workflow_automation_run_start(
206 state: &AppState,
207 automation: &crate::AutomationV2Spec,
208 run_id: &str,
209) -> anyhow::Result<crate::AutomationV2RunRecord> {
210 let updated = state
211 .update_automation_v2_run(run_id, |run| {
212 run.status = crate::AutomationRunStatus::Running;
213 run.started_at_ms.get_or_insert_with(now_ms);
214 crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
215 run,
216 "workflow_run_started",
217 Some("workflow runtime mirror started".to_string()),
218 None,
219 );
220 crate::app::state::refresh_automation_runtime_state(automation, run);
221 })
222 .await
223 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
224 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
225 .await
226 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
227 Ok(updated)
228}
229
230async fn sync_workflow_automation_action_started(
231 state: &AppState,
232 automation: &crate::AutomationV2Spec,
233 run_id: &str,
234 action_id: &str,
235) -> anyhow::Result<crate::AutomationV2RunRecord> {
236 let updated = state
237 .update_automation_v2_run(run_id, |run| {
238 let next_attempt = run
239 .checkpoint
240 .node_attempts
241 .get(action_id)
242 .copied()
243 .unwrap_or(0)
244 .saturating_add(1);
245 run.checkpoint
246 .node_attempts
247 .insert(action_id.to_string(), next_attempt);
248 run.detail = Some(format!("Running workflow action `{action_id}`"));
249 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
250 run,
251 "workflow_action_started",
252 Some(format!("workflow action `{action_id}` started")),
253 None,
254 Some(json!({
255 "action_id": action_id,
256 "attempt": next_attempt,
257 })),
258 );
259 crate::app::state::refresh_automation_runtime_state(automation, run);
260 })
261 .await
262 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
263 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
264 .await
265 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
266 Ok(updated)
267}
268
269async fn sync_workflow_automation_action_completed(
270 state: &AppState,
271 automation: &crate::AutomationV2Spec,
272 run_id: &str,
273 action_id: &str,
274 output: &Value,
275) -> anyhow::Result<crate::AutomationV2RunRecord> {
276 let action_count = automation.flow.nodes.len();
277 let updated = state
278 .update_automation_v2_run(run_id, |run| {
279 run.checkpoint.pending_nodes.retain(|id| id != action_id);
280 if !run
281 .checkpoint
282 .completed_nodes
283 .iter()
284 .any(|id| id == action_id)
285 {
286 run.checkpoint.completed_nodes.push(action_id.to_string());
287 }
288 run.checkpoint.node_outputs.insert(
289 action_id.to_string(),
290 json!(crate::AutomationNodeOutput {
291 contract_kind: "generic_artifact".to_string(),
292 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
293 validator_summary: Some(crate::AutomationValidatorSummary {
294 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
295 outcome: "accepted".to_string(),
296 reason: Some("workflow action completed".to_string()),
297 unmet_requirements: Vec::new(),
298 warning_requirements: Vec::new(),
299 warning_count: 0,
300 accepted_candidate_source: Some("workflow_runtime".to_string()),
301 verification_outcome: Some("not_applicable".to_string()),
302 validation_basis: None,
303 repair_attempted: false,
304 repair_attempt: 0,
305 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
306 as u32,
307 repair_succeeded: false,
308 repair_exhausted: false,
309 }),
310 summary: format!("Workflow action `{action_id}` completed"),
311 content: json!({
312 "action_id": action_id,
313 "output": output,
314 }),
315 created_at_ms: now_ms(),
316 node_id: action_id.to_string(),
317 status: Some("completed".to_string()),
318 blocked_reason: None,
319 approved: None,
320 workflow_class: Some("workflow_action".to_string()),
321 phase: Some("execution".to_string()),
322 failure_kind: None,
323 tool_telemetry: None,
324 preflight: None,
325 knowledge_preflight: None,
326 capability_resolution: None,
327 attempt_evidence: None,
328 blocker_category: None,
329 fallback_used: None,
330 artifact_validation: None,
331 receipt_timeline: None,
332 quality_mode: Some("strict_research_v1".to_string()),
333 requested_quality_mode: None,
334 emergency_rollback_enabled: Some(false),
335 provenance: Some(crate::AutomationNodeOutputProvenance {
336 session_id: format!("workflow-runtime-{run_id}"),
337 node_id: action_id.to_string(),
338 run_id: Some(run_id.to_string()),
339 output_path: None,
340 content_digest: None,
341 accepted_candidate_source: Some("workflow_runtime".to_string()),
342 validation_outcome: Some("not_applicable".to_string()),
343 repair_attempt: Some(0),
344 repair_succeeded: Some(false),
345 reuse_allowed: Some(false),
346 freshness: crate::AutomationNodeOutputFreshness {
347 current_run: true,
348 current_attempt: true,
349 },
350 }),
351 }),
352 );
353 if run.checkpoint.completed_nodes.len() >= action_count {
354 run.status = crate::AutomationRunStatus::Completed;
355 run.detail = Some("workflow runtime mirror completed".to_string());
356 }
357 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
358 run,
359 "workflow_action_completed",
360 Some(format!("workflow action `{action_id}` completed")),
361 None,
362 Some(json!({
363 "action_id": action_id,
364 })),
365 );
366 crate::app::state::refresh_automation_runtime_state(automation, run);
367 })
368 .await
369 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
370 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
371 .await
372 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
373 Ok(updated)
374}
375
376async fn sync_workflow_automation_action_failed(
377 state: &AppState,
378 automation: &crate::AutomationV2Spec,
379 run_id: &str,
380 action_id: &str,
381 error: &str,
382) -> anyhow::Result<crate::AutomationV2RunRecord> {
383 let updated = state
384 .update_automation_v2_run(run_id, |run| {
385 run.status = crate::AutomationRunStatus::Failed;
386 run.detail = Some(format!("Workflow action `{action_id}` failed"));
387 run.checkpoint.pending_nodes.retain(|id| id != action_id);
388 run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
389 node_id: action_id.to_string(),
390 reason: error.to_string(),
391 failed_at_ms: now_ms(),
392 });
393 run.checkpoint.node_outputs.insert(
394 action_id.to_string(),
395 json!(crate::AutomationNodeOutput {
396 contract_kind: "generic_artifact".to_string(),
397 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
398 validator_summary: Some(crate::AutomationValidatorSummary {
399 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
400 outcome: "rejected".to_string(),
401 reason: Some(error.to_string()),
402 unmet_requirements: vec![error.to_string()],
403 warning_requirements: Vec::new(),
404 warning_count: 0,
405 accepted_candidate_source: None,
406 verification_outcome: Some("failed".to_string()),
407 validation_basis: None,
408 repair_attempted: false,
409 repair_attempt: 0,
410 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
411 as u32,
412 repair_succeeded: false,
413 repair_exhausted: false,
414 }),
415 summary: format!("Workflow action `{action_id}` failed"),
416 content: json!({
417 "action_id": action_id,
418 "error": error,
419 }),
420 created_at_ms: now_ms(),
421 node_id: action_id.to_string(),
422 status: Some("failed".to_string()),
423 blocked_reason: Some(error.to_string()),
424 approved: None,
425 workflow_class: Some("workflow_action".to_string()),
426 phase: Some("execution".to_string()),
427 failure_kind: Some("workflow_action_failed".to_string()),
428 tool_telemetry: None,
429 preflight: None,
430 knowledge_preflight: None,
431 capability_resolution: None,
432 attempt_evidence: None,
433 blocker_category: Some("tool_result_unusable".to_string()),
434 fallback_used: None,
435 artifact_validation: None,
436 receipt_timeline: None,
437 quality_mode: Some("strict_research_v1".to_string()),
438 requested_quality_mode: None,
439 emergency_rollback_enabled: Some(false),
440 provenance: Some(crate::AutomationNodeOutputProvenance {
441 session_id: format!("workflow-runtime-{run_id}"),
442 node_id: action_id.to_string(),
443 run_id: Some(run_id.to_string()),
444 output_path: None,
445 content_digest: None,
446 accepted_candidate_source: None,
447 validation_outcome: Some("failed".to_string()),
448 repair_attempt: Some(0),
449 repair_succeeded: Some(false),
450 reuse_allowed: Some(false),
451 freshness: crate::AutomationNodeOutputFreshness {
452 current_run: true,
453 current_attempt: true,
454 },
455 }),
456 }),
457 );
458 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
459 run,
460 "workflow_action_failed",
461 Some(format!("workflow action `{action_id}` failed")),
462 None,
463 Some(json!({
464 "action_id": action_id,
465 "error": error,
466 })),
467 );
468 crate::app::state::refresh_automation_runtime_state(automation, run);
469 })
470 .await
471 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
472 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
473 .await
474 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
475 Ok(updated)
476}
477
478#[derive(Debug, Clone)]
479pub enum ParsedWorkflowAction {
480 EventEmit { event_type: String },
481 ResourcePut { key: String },
482 ResourcePatch { key: String },
483 ResourceDelete { key: String },
484 Tool { tool_name: String },
485 Capability { capability_id: String },
486 Workflow { workflow_id: String },
487 Agent { agent_id: String },
488}
489
490pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
491 let trimmed = action.trim();
492 if let Some(rest) = trimmed.strip_prefix("event:") {
493 return ParsedWorkflowAction::EventEmit {
494 event_type: rest.trim().to_string(),
495 };
496 }
497 if let Some(rest) = trimmed.strip_prefix("resource:put:") {
498 return ParsedWorkflowAction::ResourcePut {
499 key: rest.trim().to_string(),
500 };
501 }
502 if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
503 return ParsedWorkflowAction::ResourcePatch {
504 key: rest.trim().to_string(),
505 };
506 }
507 if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
508 return ParsedWorkflowAction::ResourceDelete {
509 key: rest.trim().to_string(),
510 };
511 }
512 if let Some(rest) = trimmed.strip_prefix("tool:") {
513 return ParsedWorkflowAction::Tool {
514 tool_name: rest.trim().to_string(),
515 };
516 }
517 if let Some(rest) = trimmed.strip_prefix("capability:") {
518 return ParsedWorkflowAction::Capability {
519 capability_id: rest.trim().to_string(),
520 };
521 }
522 if let Some(rest) = trimmed.strip_prefix("workflow:") {
523 return ParsedWorkflowAction::Workflow {
524 workflow_id: rest.trim().to_string(),
525 };
526 }
527 if let Some(rest) = trimmed.strip_prefix("agent:") {
528 return ParsedWorkflowAction::Agent {
529 agent_id: rest.trim().to_string(),
530 };
531 }
532 ParsedWorkflowAction::Capability {
533 capability_id: trimmed.to_string(),
534 }
535}
536
537pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
538 let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
539 match event.event_type.as_str() {
540 "context.task.created" => names.push("task_created".to_string()),
541 "context.task.started" => names.push("task_started".to_string()),
542 "context.task.completed" => names.push("task_completed".to_string()),
543 "context.task.failed" => names.push("task_failed".to_string()),
544 "workflow.run.started" | "routine.run.created" => {
545 names.push("workflow_started".to_string())
546 }
547 "workflow.run.completed" | "routine.run.completed" => {
548 names.push("workflow_completed".to_string())
549 }
550 "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
551 _ => {}
552 }
553 names.sort();
554 names.dedup();
555 names
556}
557
558pub async fn simulate_workflow_event(
559 state: &AppState,
560 event: &EngineEvent,
561) -> WorkflowSimulationResult {
562 let registry = state.workflow_registry().await;
563 let canonical = canonical_workflow_event_names(event);
564 let matched_bindings = registry
565 .hooks
566 .into_iter()
567 .filter(|hook| {
568 hook.enabled
569 && canonical
570 .iter()
571 .any(|name| event_name_matches(&hook.event, name))
572 })
573 .collect::<Vec<_>>();
574 let planned_actions = matched_bindings
575 .iter()
576 .flat_map(|hook| hook.actions.clone())
577 .collect::<Vec<_>>();
578 WorkflowSimulationResult {
579 matched_bindings,
580 planned_actions,
581 canonical_events: canonical,
582 }
583}
584
585pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
586 let simulation = simulate_workflow_event(state, event).await;
587 if simulation.matched_bindings.is_empty() {
588 return;
589 }
590 for hook in simulation.matched_bindings {
591 let source_event_id = source_event_id(event);
592 let task_id = task_id_from_event(event);
593 let tenant_context = event_tenant_context(event);
594 let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
595 {
596 let mut seen = state.workflow_dispatch_seen.write().await;
597 if seen.contains_key(&dedupe_key) {
598 continue;
599 }
600 seen.insert(dedupe_key, now_ms());
601 }
602 let _ = execute_hook_binding(
603 state,
604 &hook,
605 tenant_context.clone(),
606 Some(event.event_type.clone()),
607 Some(source_event_id),
608 task_id,
609 false,
610 )
611 .await;
612 }
613}
614
615pub async fn run_workflow_dispatcher(state: AppState) {
616 let mut rx = state.event_bus.subscribe();
617 loop {
618 match rx.recv().await {
619 Ok(event) => dispatch_workflow_event(&state, &event).await,
620 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
621 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
622 }
623 }
624}
625
626pub async fn execute_workflow(
627 state: &AppState,
628 workflow: &WorkflowSpec,
629 tenant_context: TenantContext,
630 trigger_event: Option<String>,
631 source_event_id: Option<String>,
632 task_id: Option<String>,
633 dry_run: bool,
634) -> anyhow::Result<WorkflowRunRecord> {
635 let actions = workflow
636 .steps
637 .iter()
638 .map(|step| PreparedWorkflowAction {
639 action_id: step.step_id.clone(),
640 spec: WorkflowActionSpec {
641 action: step.action.clone(),
642 with: step.with.clone(),
643 },
644 })
645 .collect::<Vec<_>>();
646 execute_actions(
647 state,
648 &workflow.workflow_id,
649 None,
650 actions,
651 Some(workflow.name.clone()),
652 workflow.description.clone(),
653 workflow.source.clone(),
654 tenant_context,
655 trigger_event,
656 source_event_id,
657 task_id,
658 dry_run,
659 )
660 .await
661}
662
663pub async fn execute_hook_binding(
664 state: &AppState,
665 hook: &WorkflowHookBinding,
666 tenant_context: TenantContext,
667 trigger_event: Option<String>,
668 source_event_id: Option<String>,
669 task_id: Option<String>,
670 dry_run: bool,
671) -> anyhow::Result<WorkflowRunRecord> {
672 let workflow = state
673 .get_workflow(&hook.workflow_id)
674 .await
675 .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
676 execute_actions(
677 state,
678 &hook.workflow_id,
679 Some(hook.binding_id.clone()),
680 hook.actions
681 .iter()
682 .enumerate()
683 .map(|(idx, action)| PreparedWorkflowAction {
684 action_id: format!("action_{}", idx + 1),
685 spec: action.clone(),
686 })
687 .collect(),
688 None,
689 None,
690 workflow.source,
691 tenant_context,
692 trigger_event,
693 source_event_id,
694 task_id,
695 dry_run,
696 )
697 .await
698}
699
700async fn execute_actions(
701 state: &AppState,
702 workflow_id: &str,
703 binding_id: Option<String>,
704 actions: Vec<PreparedWorkflowAction>,
705 workflow_name: Option<String>,
706 workflow_description: Option<String>,
707 source: Option<WorkflowSourceRef>,
708 tenant_context: TenantContext,
709 trigger_event: Option<String>,
710 source_event_id: Option<String>,
711 task_id: Option<String>,
712 dry_run: bool,
713) -> anyhow::Result<WorkflowRunRecord> {
714 let run_id = format!("workflow-run-{}", Uuid::new_v4());
715 let now = now_ms();
716 let automation = compile_workflow_run_automation(
717 workflow_id,
718 workflow_name.as_deref(),
719 workflow_description.as_deref(),
720 binding_id.as_deref(),
721 &actions,
722 source.as_ref(),
723 trigger_event.as_deref(),
724 );
725 let automation = state.put_automation_v2(automation).await?;
726 let automation_run = state
727 .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
728 .await?;
729 let automation_run =
730 sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
731 let mut run = WorkflowRunRecord {
732 run_id: run_id.clone(),
733 workflow_id: workflow_id.to_string(),
734 tenant_context: tenant_context.clone(),
735 automation_id: Some(automation.automation_id.clone()),
736 automation_run_id: Some(automation_run.run_id.clone()),
737 binding_id,
738 trigger_event: trigger_event.clone(),
739 source_event_id: source_event_id.clone(),
740 task_id: task_id.clone(),
741 status: if dry_run {
742 WorkflowRunStatus::DryRun
743 } else {
744 WorkflowRunStatus::Running
745 },
746 created_at_ms: now,
747 updated_at_ms: now,
748 finished_at_ms: if dry_run { Some(now) } else { None },
749 actions: actions
750 .iter()
751 .map(|action| WorkflowActionRunRecord {
752 action_id: action.action_id.clone(),
753 action: action.spec.action.clone(),
754 task_id: task_id.clone(),
755 status: if dry_run {
756 WorkflowActionRunStatus::Skipped
757 } else {
758 WorkflowActionRunStatus::Pending
759 },
760 detail: None,
761 output: None,
762 updated_at_ms: now,
763 })
764 .collect(),
765 source,
766 };
767 state.put_workflow_run(run.clone()).await?;
768 let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
769 state.event_bus.publish(EngineEvent::new(
770 "workflow.run.started",
771 workflow_event_with_tenant_context(
772 json!({
773 "runID": run.run_id,
774 "workflowID": run.workflow_id,
775 "bindingID": run.binding_id,
776 "triggerEvent": trigger_event,
777 "sourceEventID": source_event_id,
778 "taskID": task_id,
779 "dryRun": dry_run,
780 }),
781 &run.tenant_context,
782 ),
783 ));
784 if dry_run {
785 return Ok(run);
786 }
787 for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
788 action_row.status = WorkflowActionRunStatus::Running;
789 action_row.updated_at_ms = now_ms();
790 let action_name = action_row.action.clone();
791 let _ = sync_workflow_automation_action_started(
792 state,
793 &automation,
794 automation_run.run_id.as_str(),
795 &action_row.action_id,
796 )
797 .await;
798 state
799 .update_workflow_run(&run.run_id, |row| {
800 if let Some(target) = row
801 .actions
802 .iter_mut()
803 .find(|item| item.action_id == action_row.action_id)
804 {
805 *target = action_row.clone();
806 }
807 })
808 .await;
809 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
810 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
811 }
812 state.event_bus.publish(EngineEvent::new(
813 "workflow.action.started",
814 workflow_event_with_tenant_context(
815 json!({
816 "runID": run.run_id,
817 "workflowID": run.workflow_id,
818 "actionID": action_row.action_id,
819 "action": action_name,
820 "taskID": run.task_id,
821 }),
822 &run.tenant_context,
823 ),
824 ));
825 match execute_action(
826 state,
827 &run.run_id,
828 workflow_id,
829 &action_spec.spec,
830 action_row,
831 &run.tenant_context,
832 trigger_event.clone(),
833 )
834 .await
835 {
836 Ok(output) => {
837 action_row.status = WorkflowActionRunStatus::Completed;
838 action_row.output = Some(output.clone());
839 action_row.updated_at_ms = now_ms();
840 state
841 .update_workflow_run(&run.run_id, |row| {
842 if let Some(target) = row
843 .actions
844 .iter_mut()
845 .find(|item| item.action_id == action_row.action_id)
846 {
847 *target = action_row.clone();
848 }
849 })
850 .await;
851 let _ = sync_workflow_automation_action_completed(
852 state,
853 &automation,
854 automation_run.run_id.as_str(),
855 &action_row.action_id,
856 &output,
857 )
858 .await;
859 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
860 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
861 }
862 state.event_bus.publish(EngineEvent::new(
863 "workflow.action.completed",
864 workflow_event_with_tenant_context(
865 json!({
866 "runID": run.run_id,
867 "workflowID": run.workflow_id,
868 "actionID": action_row.action_id,
869 "action": action_name,
870 "taskID": run.task_id,
871 "output": output,
872 }),
873 &run.tenant_context,
874 ),
875 ));
876 }
877 Err(error) => {
878 let detail = error.to_string();
879 action_row.status = WorkflowActionRunStatus::Failed;
880 action_row.detail = Some(detail.clone());
881 action_row.updated_at_ms = now_ms();
882 run.status = WorkflowRunStatus::Failed;
883 run.finished_at_ms = Some(now_ms());
884 state
885 .update_workflow_run(&run.run_id, |row| {
886 row.status = WorkflowRunStatus::Failed;
887 row.finished_at_ms = Some(now_ms());
888 if let Some(target) = row
889 .actions
890 .iter_mut()
891 .find(|item| item.action_id == action_row.action_id)
892 {
893 *target = action_row.clone();
894 }
895 })
896 .await;
897 let _ = sync_workflow_automation_action_failed(
898 state,
899 &automation,
900 automation_run.run_id.as_str(),
901 &action_row.action_id,
902 &detail,
903 )
904 .await;
905 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
906 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
907 }
908 state.event_bus.publish(EngineEvent::new(
909 "workflow.action.failed",
910 workflow_event_with_tenant_context(
911 json!({
912 "runID": run.run_id,
913 "workflowID": run.workflow_id,
914 "actionID": action_row.action_id,
915 "action": action_name,
916 "taskID": run.task_id,
917 "error": detail,
918 }),
919 &run.tenant_context,
920 ),
921 ));
922 state.event_bus.publish(EngineEvent::new(
923 "workflow.run.failed",
924 workflow_event_with_tenant_context(
925 json!({
926 "runID": run.run_id,
927 "workflowID": run.workflow_id,
928 "actionID": action_row.action_id,
929 "taskID": run.task_id,
930 "error": action_row.detail,
931 }),
932 &run.tenant_context,
933 ),
934 ));
935 return state.get_workflow_run(&run.run_id).await.with_context(|| {
936 format!("workflow run `{}` missing after failure", run.run_id)
937 });
938 }
939 }
940 }
941 run.status = WorkflowRunStatus::Completed;
942 run.finished_at_ms = Some(now_ms());
943 let final_run = state
944 .update_workflow_run(&run.run_id, |row| {
945 row.status = WorkflowRunStatus::Completed;
946 row.finished_at_ms = Some(now_ms());
947 })
948 .await
949 .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
950 let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
951 state.event_bus.publish(EngineEvent::new(
952 "workflow.run.completed",
953 workflow_event_with_tenant_context(
954 json!({
955 "runID": final_run.run_id,
956 "workflowID": final_run.workflow_id,
957 "bindingID": final_run.binding_id,
958 "taskID": final_run.task_id,
959 }),
960 &final_run.tenant_context,
961 ),
962 ));
963 Ok(final_run)
964}
965
966async fn execute_action(
967 state: &AppState,
968 run_id: &str,
969 workflow_id: &str,
970 action_spec: &WorkflowActionSpec,
971 action_row: &WorkflowActionRunRecord,
972 tenant_context: &TenantContext,
973 trigger_event: Option<String>,
974) -> anyhow::Result<Value> {
975 let action_name = action_spec.action.as_str();
976 let parsed = parse_workflow_action(action_name);
977 match parsed {
978 ParsedWorkflowAction::EventEmit { event_type } => {
979 let payload = action_payload(action_spec, action_row);
980 state.event_bus.publish(EngineEvent::new(
981 event_type.clone(),
982 workflow_event_with_tenant_context(
983 json!({
984 "workflowID": workflow_id,
985 "actionID": action_row.action_id,
986 "triggerEvent": trigger_event,
987 "payload": payload,
988 }),
989 tenant_context,
990 ),
991 ));
992 Ok(json!({ "eventType": event_type }))
993 }
994 ParsedWorkflowAction::ResourcePut { key } => {
995 let record = state
996 .put_shared_resource(
997 key.clone(),
998 action_payload(action_spec, action_row),
999 None,
1000 "workflow".to_string(),
1001 None,
1002 )
1003 .await
1004 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1005 Ok(json!({ "key": record.key, "rev": record.rev }))
1006 }
1007 ParsedWorkflowAction::ResourcePatch { key } => {
1008 let current = state.get_shared_resource(&key).await;
1009 let next_rev = current.as_ref().map(|row| row.rev);
1010 let record = state
1011 .put_shared_resource(
1012 key.clone(),
1013 merge_object(
1014 current.map(|row| row.value).unwrap_or_else(|| json!({})),
1015 action_payload(action_spec, action_row),
1016 ),
1017 next_rev,
1018 "workflow".to_string(),
1019 None,
1020 )
1021 .await
1022 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1023 Ok(json!({ "key": record.key, "rev": record.rev }))
1024 }
1025 ParsedWorkflowAction::ResourceDelete { key } => {
1026 let deleted = state
1027 .delete_shared_resource(&key, None)
1028 .await
1029 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1030 Ok(json!({ "key": key, "deleted": deleted.is_some() }))
1031 }
1032 ParsedWorkflowAction::Tool { tool_name } => {
1033 let payload = action_payload(action_spec, action_row);
1034 let result = state.tools.execute(&tool_name, payload.clone()).await?;
1035 let mut response = json!({
1036 "tool": tool_name,
1037 "output": result.output,
1038 "metadata": result.metadata,
1039 });
1040 if let Some(external_action) = record_workflow_external_action(
1041 state,
1042 run_id,
1043 workflow_id,
1044 action_row,
1045 trigger_event.clone(),
1046 WorkflowExternalActionExecution::Tool {
1047 tool_name: tool_name.clone(),
1048 },
1049 &payload,
1050 &response,
1051 )
1052 .await?
1053 {
1054 if let Some(obj) = response.as_object_mut() {
1055 obj.insert("external_action".to_string(), external_action);
1056 }
1057 }
1058 Ok(response)
1059 }
1060 ParsedWorkflowAction::Capability { capability_id } => {
1061 let bindings = state.capability_resolver.list_bindings().await?;
1062 let tool_name = bindings
1063 .bindings
1064 .iter()
1065 .find(|binding| binding.capability_id == capability_id)
1066 .map(|binding| binding.tool_name.clone())
1067 .unwrap_or_else(|| capability_id.clone());
1068 let payload = action_payload(action_spec, action_row);
1069 let result = state.tools.execute(&tool_name, payload.clone()).await?;
1070 let mut response = json!({
1071 "capability": capability_id,
1072 "tool": tool_name,
1073 "output": result.output,
1074 "metadata": result.metadata,
1075 });
1076 if let Some(external_action) = record_workflow_external_action(
1077 state,
1078 run_id,
1079 workflow_id,
1080 action_row,
1081 trigger_event.clone(),
1082 WorkflowExternalActionExecution::Capability {
1083 capability_id: capability_id.clone(),
1084 tool_name: tool_name.clone(),
1085 },
1086 &payload,
1087 &response,
1088 )
1089 .await?
1090 {
1091 if let Some(obj) = response.as_object_mut() {
1092 obj.insert("external_action".to_string(), external_action);
1093 }
1094 }
1095 Ok(response)
1096 }
1097 ParsedWorkflowAction::Workflow { workflow_id } => {
1098 anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1099 }
1100 ParsedWorkflowAction::Agent { agent_id } => {
1101 let workspace_root = state.workspace_index.snapshot().await.root;
1102 let session = Session::new(
1103 Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1104 Some(workspace_root.clone()),
1105 );
1106 let session_id = session.id.clone();
1107 state.storage.save_session(session).await?;
1108 let prompt = action_spec
1109 .with
1110 .as_ref()
1111 .and_then(|v| v.get("prompt"))
1112 .and_then(|v| v.as_str())
1113 .map(ToString::to_string)
1114 .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1115 let request = SendMessageRequest {
1116 parts: vec![MessagePartInput::Text { text: prompt }],
1117 model: None,
1118 agent: Some(agent_id.clone()),
1119 tool_mode: None,
1120 tool_allowlist: None,
1121 context_mode: None,
1122 write_required: None,
1123 prewrite_requirements: None,
1124 };
1125 state
1126 .engine_loop
1127 .run_prompt_async_with_context(
1128 session_id.clone(),
1129 request,
1130 Some(format!("workflow:{workflow_id}")),
1131 )
1132 .await?;
1133 Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1134 }
1135 }
1136}
1137
1138enum WorkflowExternalActionExecution {
1139 Tool {
1140 tool_name: String,
1141 },
1142 Capability {
1143 capability_id: String,
1144 tool_name: String,
1145 },
1146}
1147
1148async fn record_workflow_external_action(
1149 state: &AppState,
1150 run_id: &str,
1151 workflow_id: &str,
1152 action_row: &WorkflowActionRunRecord,
1153 trigger_event: Option<String>,
1154 execution: WorkflowExternalActionExecution,
1155 payload: &Value,
1156 result: &Value,
1157) -> anyhow::Result<Option<Value>> {
1158 let bindings = state.capability_resolver.list_bindings().await?;
1159 let binding = match execution {
1160 WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1161 .bindings
1162 .iter()
1163 .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1164 WorkflowExternalActionExecution::Capability {
1165 ref capability_id,
1166 ref tool_name,
1167 } => bindings.bindings.iter().find(|binding| {
1168 binding.capability_id == *capability_id
1169 && workflow_binding_matches_tool_name(binding, tool_name)
1170 }),
1171 };
1172 let Some(binding) = binding else {
1173 return Ok(None);
1174 };
1175
1176 let target = workflow_external_action_target(payload, result);
1177 let source_id = format!("{run_id}:{}", action_row.action_id);
1178 let idempotency_key = crate::sha256_hex(&[
1179 workflow_id,
1180 run_id,
1181 &action_row.action_id,
1182 &action_row.action,
1183 &payload.to_string(),
1184 ]);
1185 let action = crate::ExternalActionRecord {
1186 action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1187 operation: binding.capability_id.clone(),
1188 status: "posted".to_string(),
1189 source_kind: Some("workflow".to_string()),
1190 source_id: Some(source_id.clone()),
1191 routine_run_id: None,
1192 context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1193 capability_id: Some(binding.capability_id.clone()),
1194 provider: Some(binding.provider.clone()),
1195 target,
1196 approval_state: Some("executed".to_string()),
1197 idempotency_key: Some(idempotency_key),
1198 receipt: Some(result.clone()),
1199 error: None,
1200 metadata: Some(json!({
1201 "workflowID": workflow_id,
1202 "workflowRunID": run_id,
1203 "actionID": action_row.action_id,
1204 "action": action_row.action,
1205 "taskID": action_row.task_id,
1206 "triggerEvent": trigger_event,
1207 "tool": binding.tool_name,
1208 "provider": binding.provider,
1209 "input": payload,
1210 })),
1211 created_at_ms: action_row.updated_at_ms,
1212 updated_at_ms: action_row.updated_at_ms,
1213 };
1214 let recorded = state.record_external_action(action).await?;
1215 Ok(Some(serde_json::to_value(&recorded)?))
1216}
1217
1218fn workflow_binding_matches_tool_name(
1219 binding: &crate::capability_resolver::CapabilityBinding,
1220 tool_name: &str,
1221) -> bool {
1222 binding.tool_name.eq_ignore_ascii_case(tool_name)
1223 || binding
1224 .tool_name_aliases
1225 .iter()
1226 .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1227}
1228
1229fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1230 for candidate in [
1231 payload.pointer("/owner_repo").and_then(Value::as_str),
1232 payload.pointer("/repo").and_then(Value::as_str),
1233 payload.pointer("/repository").and_then(Value::as_str),
1234 payload.pointer("/channel").and_then(Value::as_str),
1235 payload.pointer("/channel_id").and_then(Value::as_str),
1236 payload.pointer("/thread_ts").and_then(Value::as_str),
1237 result.pointer("/metadata/channel").and_then(Value::as_str),
1238 result.pointer("/metadata/repo").and_then(Value::as_str),
1239 ] {
1240 let trimmed = candidate.map(str::trim).unwrap_or_default();
1241 if !trimmed.is_empty() {
1242 return Some(trimmed.to_string());
1243 }
1244 }
1245 None
1246}
1247
1248fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1249 action_spec
1250 .with
1251 .clone()
1252 .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1253}
1254
1255fn merge_object(current: Value, patch: Value) -> Value {
1256 if let (Some(mut current_obj), Some(patch_obj)) =
1257 (current.as_object().cloned(), patch.as_object())
1258 {
1259 for (key, value) in patch_obj {
1260 current_obj.insert(key.clone(), value.clone());
1261 }
1262 Value::Object(current_obj)
1263 } else {
1264 patch
1265 }
1266}
1267
1268fn source_event_id(event: &EngineEvent) -> String {
1269 if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1270 return id.to_string();
1271 }
1272 for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1273 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1274 return format!("{}:{id}", event.event_type);
1275 }
1276 }
1277 format!("{}:{}", event.event_type, event.properties)
1278}
1279
1280fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1281 for key in ["task_id", "taskID", "step_id", "stepID"] {
1282 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1283 let trimmed = id.trim();
1284 if !trimmed.is_empty() {
1285 return Some(trimmed.to_string());
1286 }
1287 }
1288 }
1289 None
1290}
1291
1292fn event_name_matches(expected: &str, actual: &str) -> bool {
1293 expected.trim().eq_ignore_ascii_case(actual.trim())
1294}