1use anyhow::Context;
2use serde_json::{json, Value};
3use std::time::Duration;
4use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session, TenantContext};
5use tandem_workflows::{
6 WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
7 WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
8};
9use uuid::Uuid;
10
11use crate::{now_ms, AppState, WorkflowSourceRef};
12
13#[derive(Debug, Clone)]
14struct PreparedWorkflowAction {
15 action_id: String,
16 spec: WorkflowActionSpec,
17}
18
19fn workflow_event_tenant_context_value(tenant_context: &TenantContext) -> Value {
20 serde_json::to_value(tenant_context).unwrap_or_else(|_| json!(tenant_context))
21}
22
23fn workflow_event_with_tenant_context(mut payload: Value, tenant_context: &TenantContext) -> Value {
24 if let Some(map) = payload.as_object_mut() {
25 map.entry("tenantContext".to_string())
26 .or_insert_with(|| workflow_event_tenant_context_value(tenant_context));
27 }
28 payload
29}
30
31fn event_tenant_context(event: &EngineEvent) -> TenantContext {
32 event
33 .properties
34 .get("tenantContext")
35 .and_then(|value| serde_json::from_value(value.clone()).ok())
36 .unwrap_or_else(TenantContext::local_implicit)
37}
38
39fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
40 match with {
41 Some(with) if !with.is_null() => {
42 format!("Execute workflow action `{action}` with payload {with}.")
43 }
44 _ => format!("Execute workflow action `{action}`."),
45 }
46}
47
48fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
49 crate::AutomationV2Schedule {
50 schedule_type: crate::AutomationV2ScheduleType::Manual,
51 cron_expression: None,
52 interval_seconds: None,
53 timezone: "UTC".to_string(),
54 misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
55 }
56}
57
58fn workflow_execution_plan(
59 workflow_id: &str,
60 name: &str,
61 description: Option<String>,
62 actions: &[PreparedWorkflowAction],
63 source_label: &str,
64 source: Option<&WorkflowSourceRef>,
65 trigger_event: Option<&str>,
66) -> crate::WorkflowPlan {
67 crate::WorkflowPlan {
68 plan_id: format!("workflow-plan-{workflow_id}"),
69 planner_version: "workflow_runtime_v1".to_string(),
70 plan_source: source_label.to_string(),
71 original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
72 normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
73 confidence: "high".to_string(),
74 title: name.to_string(),
75 description,
76 schedule: workflow_manual_schedule(),
77 execution_target: "automation_v2".to_string(),
78 workspace_root: std::env::current_dir()
79 .unwrap_or_else(|_| std::path::PathBuf::from("."))
80 .to_string_lossy()
81 .to_string(),
82 steps: actions
83 .iter()
84 .map(|action| crate::WorkflowPlanStep {
85 step_id: action.action_id.clone(),
86 kind: "workflow_action".to_string(),
87 objective: workflow_action_objective(
88 &action.spec.action,
89 action.spec.with.as_ref(),
90 ),
91 depends_on: Vec::new(),
92 agent_role: "operator".to_string(),
93 input_refs: Vec::new(),
94 output_contract: Some(crate::AutomationFlowOutputContract {
95 kind: "generic_artifact".to_string(),
96 validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
97 enforcement: None,
98 schema: None,
99 summary_guidance: None,
100 }),
101 metadata: None,
102 })
103 .collect(),
104 requires_integrations: Vec::new(),
105 allowed_mcp_servers: Vec::new(),
106 operator_preferences: Some(json!({
107 "source": source_label,
108 "tool_access_mode": "auto",
109 })),
110 save_options: json!({
111 "origin": source_label,
112 "workflow_source": source,
113 "trigger_event": trigger_event,
114 }),
115 }
116}
117
118pub(crate) fn compile_workflow_spec_to_automation_preview(
119 workflow: &WorkflowSpec,
120) -> crate::AutomationV2Spec {
121 let actions = workflow
122 .steps
123 .iter()
124 .map(|step| PreparedWorkflowAction {
125 action_id: step.step_id.clone(),
126 spec: WorkflowActionSpec {
127 action: step.action.clone(),
128 with: step.with.clone(),
129 },
130 })
131 .collect::<Vec<_>>();
132 let mut automation = crate::http::compile_plan_to_automation_v2(
133 &workflow_execution_plan(
134 &workflow.workflow_id,
135 &workflow.name,
136 workflow.description.clone(),
137 &actions,
138 "workflow_registry",
139 workflow.source.as_ref(),
140 None,
141 ),
142 None,
143 "workflow_registry",
144 );
145 if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
146 metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
147 metadata.insert("workflow_name".to_string(), json!(workflow.name));
148 metadata.insert("workflow_source".to_string(), json!(workflow.source));
149 metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
150 }
151 automation
152}
153
154fn compile_workflow_run_automation(
155 workflow_id: &str,
156 workflow_name: Option<&str>,
157 workflow_description: Option<&str>,
158 binding_id: Option<&str>,
159 actions: &[PreparedWorkflowAction],
160 source: Option<&WorkflowSourceRef>,
161 trigger_event: Option<&str>,
162) -> crate::AutomationV2Spec {
163 let automation_id = binding_id
164 .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
165 .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
166 let title = binding_id
167 .map(|binding| {
168 workflow_name
169 .map(|name| format!("{name} hook {binding}"))
170 .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
171 })
172 .unwrap_or_else(|| {
173 workflow_name
174 .map(|name| format!("{name} execution"))
175 .unwrap_or_else(|| format!("Workflow {workflow_id}"))
176 });
177 let mut automation = crate::http::compile_plan_to_automation_v2(
178 &workflow_execution_plan(
179 &automation_id,
180 &title,
181 Some(
182 workflow_description
183 .map(|description| description.to_string())
184 .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
185 ),
186 actions,
187 "workflow_runtime",
188 source,
189 trigger_event,
190 ),
191 None,
192 "workflow_runtime",
193 );
194 automation.automation_id = automation_id.clone();
195 automation.name = title;
196 automation.metadata = Some(json!({
197 "workflow_id": workflow_id,
198 "binding_id": binding_id,
199 "workflow_source": source,
200 "trigger_event": trigger_event,
201 "origin": "workflow_runtime_mirror",
202 }));
203 automation
204}
205
206async fn sync_workflow_automation_run_start(
207 state: &AppState,
208 automation: &crate::AutomationV2Spec,
209 run_id: &str,
210) -> anyhow::Result<crate::AutomationV2RunRecord> {
211 let updated = state
212 .update_automation_v2_run(run_id, |run| {
213 run.status = crate::AutomationRunStatus::Running;
214 run.started_at_ms.get_or_insert_with(now_ms);
215 crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
216 run,
217 "workflow_run_started",
218 Some("workflow runtime mirror started".to_string()),
219 None,
220 );
221 crate::app::state::refresh_automation_runtime_state(automation, run);
222 })
223 .await
224 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
225 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
226 .await
227 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
228 Ok(updated)
229}
230
231async fn sync_workflow_automation_action_started(
232 state: &AppState,
233 automation: &crate::AutomationV2Spec,
234 run_id: &str,
235 action_id: &str,
236) -> anyhow::Result<crate::AutomationV2RunRecord> {
237 let updated = state
238 .update_automation_v2_run(run_id, |run| {
239 let next_attempt = run
240 .checkpoint
241 .node_attempts
242 .get(action_id)
243 .copied()
244 .unwrap_or(0)
245 .saturating_add(1);
246 run.checkpoint
247 .node_attempts
248 .insert(action_id.to_string(), next_attempt);
249 run.detail = Some(format!("Running workflow action `{action_id}`"));
250 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
251 run,
252 "workflow_action_started",
253 Some(format!("workflow action `{action_id}` started")),
254 None,
255 Some(json!({
256 "action_id": action_id,
257 "attempt": next_attempt,
258 })),
259 );
260 crate::app::state::refresh_automation_runtime_state(automation, run);
261 })
262 .await
263 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
264 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
265 .await
266 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
267 Ok(updated)
268}
269
270async fn sync_workflow_automation_action_completed(
271 state: &AppState,
272 automation: &crate::AutomationV2Spec,
273 run_id: &str,
274 action_id: &str,
275 output: &Value,
276) -> anyhow::Result<crate::AutomationV2RunRecord> {
277 let action_count = automation.flow.nodes.len();
278 let updated = state
279 .update_automation_v2_run(run_id, |run| {
280 run.checkpoint.pending_nodes.retain(|id| id != action_id);
281 if !run
282 .checkpoint
283 .completed_nodes
284 .iter()
285 .any(|id| id == action_id)
286 {
287 run.checkpoint.completed_nodes.push(action_id.to_string());
288 }
289 run.checkpoint.node_outputs.insert(
290 action_id.to_string(),
291 json!(crate::AutomationNodeOutput {
292 contract_kind: "generic_artifact".to_string(),
293 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
294 validator_summary: Some(crate::AutomationValidatorSummary {
295 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
296 outcome: "accepted".to_string(),
297 reason: Some("workflow action completed".to_string()),
298 unmet_requirements: Vec::new(),
299 warning_requirements: Vec::new(),
300 warning_count: 0,
301 accepted_candidate_source: Some("workflow_runtime".to_string()),
302 verification_outcome: Some("not_applicable".to_string()),
303 validation_basis: None,
304 repair_attempted: false,
305 repair_attempt: 0,
306 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
307 as u32,
308 repair_succeeded: false,
309 repair_exhausted: false,
310 }),
311 summary: format!("Workflow action `{action_id}` completed"),
312 content: json!({
313 "action_id": action_id,
314 "output": output,
315 }),
316 created_at_ms: now_ms(),
317 node_id: action_id.to_string(),
318 status: Some("completed".to_string()),
319 blocked_reason: None,
320 approved: None,
321 workflow_class: Some("workflow_action".to_string()),
322 phase: Some("execution".to_string()),
323 failure_kind: None,
324 tool_telemetry: None,
325 preflight: None,
326 knowledge_preflight: None,
327 capability_resolution: None,
328 attempt_evidence: None,
329 blocker_category: None,
330 fallback_used: None,
331 artifact_validation: None,
332 receipt_timeline: None,
333 quality_mode: Some("strict_research_v1".to_string()),
334 requested_quality_mode: None,
335 emergency_rollback_enabled: Some(false),
336 provenance: Some(crate::AutomationNodeOutputProvenance {
337 session_id: format!("workflow-runtime-{run_id}"),
338 node_id: action_id.to_string(),
339 run_id: Some(run_id.to_string()),
340 output_path: None,
341 content_digest: None,
342 accepted_candidate_source: Some("workflow_runtime".to_string()),
343 validation_outcome: Some("not_applicable".to_string()),
344 repair_attempt: Some(0),
345 repair_succeeded: Some(false),
346 reuse_allowed: Some(false),
347 freshness: crate::AutomationNodeOutputFreshness {
348 current_run: true,
349 current_attempt: true,
350 },
351 }),
352 }),
353 );
354 if run.checkpoint.completed_nodes.len() >= action_count {
355 run.status = crate::AutomationRunStatus::Completed;
356 run.detail = Some("workflow runtime mirror completed".to_string());
357 }
358 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
359 run,
360 "workflow_action_completed",
361 Some(format!("workflow action `{action_id}` completed")),
362 None,
363 Some(json!({
364 "action_id": action_id,
365 })),
366 );
367 crate::app::state::refresh_automation_runtime_state(automation, run);
368 })
369 .await
370 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
371 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
372 .await
373 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
374 Ok(updated)
375}
376
377async fn sync_workflow_automation_action_failed(
378 state: &AppState,
379 automation: &crate::AutomationV2Spec,
380 run_id: &str,
381 action_id: &str,
382 error: &str,
383) -> anyhow::Result<crate::AutomationV2RunRecord> {
384 let updated = state
385 .update_automation_v2_run(run_id, |run| {
386 run.status = crate::AutomationRunStatus::Failed;
387 run.detail = Some(format!("Workflow action `{action_id}` failed"));
388 run.checkpoint.pending_nodes.retain(|id| id != action_id);
389 run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
390 node_id: action_id.to_string(),
391 reason: error.to_string(),
392 failed_at_ms: now_ms(),
393 });
394 run.checkpoint.node_outputs.insert(
395 action_id.to_string(),
396 json!(crate::AutomationNodeOutput {
397 contract_kind: "generic_artifact".to_string(),
398 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
399 validator_summary: Some(crate::AutomationValidatorSummary {
400 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
401 outcome: "rejected".to_string(),
402 reason: Some(error.to_string()),
403 unmet_requirements: vec![error.to_string()],
404 warning_requirements: Vec::new(),
405 warning_count: 0,
406 accepted_candidate_source: None,
407 verification_outcome: Some("failed".to_string()),
408 validation_basis: None,
409 repair_attempted: false,
410 repair_attempt: 0,
411 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
412 as u32,
413 repair_succeeded: false,
414 repair_exhausted: false,
415 }),
416 summary: format!("Workflow action `{action_id}` failed"),
417 content: json!({
418 "action_id": action_id,
419 "error": error,
420 }),
421 created_at_ms: now_ms(),
422 node_id: action_id.to_string(),
423 status: Some("failed".to_string()),
424 blocked_reason: Some(error.to_string()),
425 approved: None,
426 workflow_class: Some("workflow_action".to_string()),
427 phase: Some("execution".to_string()),
428 failure_kind: Some("workflow_action_failed".to_string()),
429 tool_telemetry: None,
430 preflight: None,
431 knowledge_preflight: None,
432 capability_resolution: None,
433 attempt_evidence: None,
434 blocker_category: Some("tool_result_unusable".to_string()),
435 fallback_used: None,
436 artifact_validation: None,
437 receipt_timeline: None,
438 quality_mode: Some("strict_research_v1".to_string()),
439 requested_quality_mode: None,
440 emergency_rollback_enabled: Some(false),
441 provenance: Some(crate::AutomationNodeOutputProvenance {
442 session_id: format!("workflow-runtime-{run_id}"),
443 node_id: action_id.to_string(),
444 run_id: Some(run_id.to_string()),
445 output_path: None,
446 content_digest: None,
447 accepted_candidate_source: None,
448 validation_outcome: Some("failed".to_string()),
449 repair_attempt: Some(0),
450 repair_succeeded: Some(false),
451 reuse_allowed: Some(false),
452 freshness: crate::AutomationNodeOutputFreshness {
453 current_run: true,
454 current_attempt: true,
455 },
456 }),
457 }),
458 );
459 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
460 run,
461 "workflow_action_failed",
462 Some(format!("workflow action `{action_id}` failed")),
463 None,
464 Some(json!({
465 "action_id": action_id,
466 "error": error,
467 })),
468 );
469 crate::app::state::refresh_automation_runtime_state(automation, run);
470 })
471 .await
472 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
473 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
474 .await
475 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
476 Ok(updated)
477}
478
479#[derive(Debug, Clone)]
480pub enum ParsedWorkflowAction {
481 EventEmit { event_type: String },
482 ResourcePut { key: String },
483 ResourcePatch { key: String },
484 ResourceDelete { key: String },
485 Tool { tool_name: String },
486 Capability { capability_id: String },
487 Workflow { workflow_id: String },
488 Agent { agent_id: String },
489}
490
491pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
492 let trimmed = action.trim();
493 if let Some(rest) = trimmed.strip_prefix("event:") {
494 return ParsedWorkflowAction::EventEmit {
495 event_type: rest.trim().to_string(),
496 };
497 }
498 if let Some(rest) = trimmed.strip_prefix("resource:put:") {
499 return ParsedWorkflowAction::ResourcePut {
500 key: rest.trim().to_string(),
501 };
502 }
503 if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
504 return ParsedWorkflowAction::ResourcePatch {
505 key: rest.trim().to_string(),
506 };
507 }
508 if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
509 return ParsedWorkflowAction::ResourceDelete {
510 key: rest.trim().to_string(),
511 };
512 }
513 if let Some(rest) = trimmed.strip_prefix("tool:") {
514 return ParsedWorkflowAction::Tool {
515 tool_name: rest.trim().to_string(),
516 };
517 }
518 if let Some(rest) = trimmed.strip_prefix("capability:") {
519 return ParsedWorkflowAction::Capability {
520 capability_id: rest.trim().to_string(),
521 };
522 }
523 if let Some(rest) = trimmed.strip_prefix("workflow:") {
524 return ParsedWorkflowAction::Workflow {
525 workflow_id: rest.trim().to_string(),
526 };
527 }
528 if let Some(rest) = trimmed.strip_prefix("agent:") {
529 return ParsedWorkflowAction::Agent {
530 agent_id: rest.trim().to_string(),
531 };
532 }
533 ParsedWorkflowAction::Capability {
534 capability_id: trimmed.to_string(),
535 }
536}
537
538pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
539 let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
540 match event.event_type.as_str() {
541 "context.task.created" => names.push("task_created".to_string()),
542 "context.task.started" => names.push("task_started".to_string()),
543 "context.task.completed" => names.push("task_completed".to_string()),
544 "context.task.failed" => names.push("task_failed".to_string()),
545 "workflow.run.started" | "routine.run.created" => {
546 names.push("workflow_started".to_string())
547 }
548 "workflow.run.completed" | "routine.run.completed" => {
549 names.push("workflow_completed".to_string())
550 }
551 "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
552 _ => {}
553 }
554 names.sort();
555 names.dedup();
556 names
557}
558
559pub async fn simulate_workflow_event(
560 state: &AppState,
561 event: &EngineEvent,
562) -> WorkflowSimulationResult {
563 let registry = state.workflow_registry().await;
564 let canonical = canonical_workflow_event_names(event);
565 let matched_bindings = registry
566 .hooks
567 .into_iter()
568 .filter(|hook| {
569 hook.enabled
570 && canonical
571 .iter()
572 .any(|name| event_name_matches(&hook.event, name))
573 })
574 .collect::<Vec<_>>();
575 let planned_actions = matched_bindings
576 .iter()
577 .flat_map(|hook| hook.actions.clone())
578 .collect::<Vec<_>>();
579 WorkflowSimulationResult {
580 matched_bindings,
581 planned_actions,
582 canonical_events: canonical,
583 }
584}
585
586pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
587 let simulation = simulate_workflow_event(state, event).await;
588 if simulation.matched_bindings.is_empty() {
589 return;
590 }
591 for hook in simulation.matched_bindings {
592 let source_event_id = source_event_id(event);
593 let task_id = task_id_from_event(event);
594 let tenant_context = event_tenant_context(event);
595 let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
596 {
597 let mut seen = state.workflow_dispatch_seen.write().await;
598 if seen.contains_key(&dedupe_key) {
599 continue;
600 }
601 seen.insert(dedupe_key, now_ms());
602 }
603 let _ = execute_hook_binding(
604 state,
605 &hook,
606 tenant_context.clone(),
607 Some(event.event_type.clone()),
608 Some(source_event_id),
609 task_id,
610 false,
611 )
612 .await;
613 }
614}
615
616pub async fn run_workflow_dispatcher(state: AppState) {
617 if !state.wait_until_ready_or_failed(120, 250).await {
618 let startup = state.startup_snapshot().await;
619 tracing::warn!(
620 component = "workflow_dispatcher",
621 startup_status = ?startup.status,
622 startup_phase = %startup.phase,
623 attempt_id = %startup.attempt_id,
624 "workflow dispatcher exiting before runtime access because startup did not become ready"
625 );
626 return;
627 }
628 let mut rx = state.event_bus.subscribe();
629 loop {
630 match rx.recv().await {
631 Ok(event) => dispatch_workflow_event(&state, &event).await,
632 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
633 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
634 }
635 }
636}
637
638pub async fn execute_workflow(
639 state: &AppState,
640 workflow: &WorkflowSpec,
641 tenant_context: TenantContext,
642 trigger_event: Option<String>,
643 source_event_id: Option<String>,
644 task_id: Option<String>,
645 dry_run: bool,
646) -> anyhow::Result<WorkflowRunRecord> {
647 let actions = workflow
648 .steps
649 .iter()
650 .map(|step| PreparedWorkflowAction {
651 action_id: step.step_id.clone(),
652 spec: WorkflowActionSpec {
653 action: step.action.clone(),
654 with: step.with.clone(),
655 },
656 })
657 .collect::<Vec<_>>();
658 execute_actions(
659 state,
660 &workflow.workflow_id,
661 None,
662 actions,
663 Some(workflow.name.clone()),
664 workflow.description.clone(),
665 workflow.source.clone(),
666 tenant_context,
667 trigger_event,
668 source_event_id,
669 task_id,
670 dry_run,
671 )
672 .await
673}
674
675pub async fn execute_hook_binding(
676 state: &AppState,
677 hook: &WorkflowHookBinding,
678 tenant_context: TenantContext,
679 trigger_event: Option<String>,
680 source_event_id: Option<String>,
681 task_id: Option<String>,
682 dry_run: bool,
683) -> anyhow::Result<WorkflowRunRecord> {
684 let workflow = state
685 .get_workflow(&hook.workflow_id)
686 .await
687 .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
688 execute_actions(
689 state,
690 &hook.workflow_id,
691 Some(hook.binding_id.clone()),
692 hook.actions
693 .iter()
694 .enumerate()
695 .map(|(idx, action)| PreparedWorkflowAction {
696 action_id: format!("action_{}", idx + 1),
697 spec: action.clone(),
698 })
699 .collect(),
700 None,
701 None,
702 workflow.source,
703 tenant_context,
704 trigger_event,
705 source_event_id,
706 task_id,
707 dry_run,
708 )
709 .await
710}
711
712async fn execute_actions(
713 state: &AppState,
714 workflow_id: &str,
715 binding_id: Option<String>,
716 actions: Vec<PreparedWorkflowAction>,
717 workflow_name: Option<String>,
718 workflow_description: Option<String>,
719 source: Option<WorkflowSourceRef>,
720 tenant_context: TenantContext,
721 trigger_event: Option<String>,
722 source_event_id: Option<String>,
723 task_id: Option<String>,
724 dry_run: bool,
725) -> anyhow::Result<WorkflowRunRecord> {
726 let run_id = format!("workflow-run-{}", Uuid::new_v4());
727 let now = now_ms();
728 let automation = compile_workflow_run_automation(
729 workflow_id,
730 workflow_name.as_deref(),
731 workflow_description.as_deref(),
732 binding_id.as_deref(),
733 &actions,
734 source.as_ref(),
735 trigger_event.as_deref(),
736 );
737 let automation = state.put_automation_v2(automation).await?;
738 let automation_run = state
739 .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
740 .await?;
741 let automation_run =
742 sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
743 let mut run = WorkflowRunRecord {
744 run_id: run_id.clone(),
745 workflow_id: workflow_id.to_string(),
746 tenant_context: tenant_context.clone(),
747 automation_id: Some(automation.automation_id.clone()),
748 automation_run_id: Some(automation_run.run_id.clone()),
749 binding_id,
750 trigger_event: trigger_event.clone(),
751 source_event_id: source_event_id.clone(),
752 task_id: task_id.clone(),
753 status: if dry_run {
754 WorkflowRunStatus::DryRun
755 } else {
756 WorkflowRunStatus::Running
757 },
758 created_at_ms: now,
759 updated_at_ms: now,
760 finished_at_ms: if dry_run { Some(now) } else { None },
761 actions: actions
762 .iter()
763 .map(|action| WorkflowActionRunRecord {
764 action_id: action.action_id.clone(),
765 action: action.spec.action.clone(),
766 task_id: task_id.clone(),
767 status: if dry_run {
768 WorkflowActionRunStatus::Skipped
769 } else {
770 WorkflowActionRunStatus::Pending
771 },
772 detail: None,
773 output: None,
774 updated_at_ms: now,
775 })
776 .collect(),
777 source,
778 };
779 state.put_workflow_run(run.clone()).await?;
780 let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
781 state.event_bus.publish(EngineEvent::new(
782 "workflow.run.started",
783 workflow_event_with_tenant_context(
784 json!({
785 "runID": run.run_id,
786 "workflowID": run.workflow_id,
787 "bindingID": run.binding_id,
788 "triggerEvent": trigger_event,
789 "sourceEventID": source_event_id,
790 "taskID": task_id,
791 "dryRun": dry_run,
792 }),
793 &run.tenant_context,
794 ),
795 ));
796 if dry_run {
797 return Ok(run);
798 }
799 for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
800 action_row.status = WorkflowActionRunStatus::Running;
801 action_row.updated_at_ms = now_ms();
802 let action_name = action_row.action.clone();
803 let _ = sync_workflow_automation_action_started(
804 state,
805 &automation,
806 automation_run.run_id.as_str(),
807 &action_row.action_id,
808 )
809 .await;
810 state
811 .update_workflow_run(&run.run_id, |row| {
812 if let Some(target) = row
813 .actions
814 .iter_mut()
815 .find(|item| item.action_id == action_row.action_id)
816 {
817 *target = action_row.clone();
818 }
819 })
820 .await;
821 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
822 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
823 }
824 state.event_bus.publish(EngineEvent::new(
825 "workflow.action.started",
826 workflow_event_with_tenant_context(
827 json!({
828 "runID": run.run_id,
829 "workflowID": run.workflow_id,
830 "actionID": action_row.action_id,
831 "action": action_name,
832 "taskID": run.task_id,
833 }),
834 &run.tenant_context,
835 ),
836 ));
837 match execute_action(
838 state,
839 &run.run_id,
840 workflow_id,
841 &action_spec.spec,
842 action_row,
843 &run.tenant_context,
844 trigger_event.clone(),
845 )
846 .await
847 {
848 Ok(output) => {
849 action_row.status = WorkflowActionRunStatus::Completed;
850 action_row.output = Some(output.clone());
851 action_row.updated_at_ms = now_ms();
852 state
853 .update_workflow_run(&run.run_id, |row| {
854 if let Some(target) = row
855 .actions
856 .iter_mut()
857 .find(|item| item.action_id == action_row.action_id)
858 {
859 *target = action_row.clone();
860 }
861 })
862 .await;
863 let _ = sync_workflow_automation_action_completed(
864 state,
865 &automation,
866 automation_run.run_id.as_str(),
867 &action_row.action_id,
868 &output,
869 )
870 .await;
871 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
872 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
873 }
874 state.event_bus.publish(EngineEvent::new(
875 "workflow.action.completed",
876 workflow_event_with_tenant_context(
877 json!({
878 "runID": run.run_id,
879 "workflowID": run.workflow_id,
880 "actionID": action_row.action_id,
881 "action": action_name,
882 "taskID": run.task_id,
883 "output": output,
884 }),
885 &run.tenant_context,
886 ),
887 ));
888 }
889 Err(error) => {
890 let detail = error.to_string();
891 action_row.status = WorkflowActionRunStatus::Failed;
892 action_row.detail = Some(detail.clone());
893 action_row.updated_at_ms = now_ms();
894 run.status = WorkflowRunStatus::Failed;
895 run.finished_at_ms = Some(now_ms());
896 state
897 .update_workflow_run(&run.run_id, |row| {
898 row.status = WorkflowRunStatus::Failed;
899 row.finished_at_ms = Some(now_ms());
900 if let Some(target) = row
901 .actions
902 .iter_mut()
903 .find(|item| item.action_id == action_row.action_id)
904 {
905 *target = action_row.clone();
906 }
907 })
908 .await;
909 let _ = sync_workflow_automation_action_failed(
910 state,
911 &automation,
912 automation_run.run_id.as_str(),
913 &action_row.action_id,
914 &detail,
915 )
916 .await;
917 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
918 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
919 }
920 state.event_bus.publish(EngineEvent::new(
921 "workflow.action.failed",
922 workflow_event_with_tenant_context(
923 json!({
924 "runID": run.run_id,
925 "workflowID": run.workflow_id,
926 "actionID": action_row.action_id,
927 "action": action_name,
928 "taskID": run.task_id,
929 "error": detail,
930 }),
931 &run.tenant_context,
932 ),
933 ));
934 state.event_bus.publish(EngineEvent::new(
935 "workflow.run.failed",
936 workflow_event_with_tenant_context(
937 json!({
938 "runID": run.run_id,
939 "workflowID": run.workflow_id,
940 "actionID": action_row.action_id,
941 "taskID": run.task_id,
942 "error": action_row.detail,
943 }),
944 &run.tenant_context,
945 ),
946 ));
947 return state.get_workflow_run(&run.run_id).await.with_context(|| {
948 format!("workflow run `{}` missing after failure", run.run_id)
949 });
950 }
951 }
952 }
953 run.status = WorkflowRunStatus::Completed;
954 run.finished_at_ms = Some(now_ms());
955 let final_run = state
956 .update_workflow_run(&run.run_id, |row| {
957 row.status = WorkflowRunStatus::Completed;
958 row.finished_at_ms = Some(now_ms());
959 })
960 .await
961 .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
962 let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
963 state.event_bus.publish(EngineEvent::new(
964 "workflow.run.completed",
965 workflow_event_with_tenant_context(
966 json!({
967 "runID": final_run.run_id,
968 "workflowID": final_run.workflow_id,
969 "bindingID": final_run.binding_id,
970 "taskID": final_run.task_id,
971 }),
972 &final_run.tenant_context,
973 ),
974 ));
975 Ok(final_run)
976}
977
978async fn execute_action(
979 state: &AppState,
980 run_id: &str,
981 workflow_id: &str,
982 action_spec: &WorkflowActionSpec,
983 action_row: &WorkflowActionRunRecord,
984 tenant_context: &TenantContext,
985 trigger_event: Option<String>,
986) -> anyhow::Result<Value> {
987 let action_name = action_spec.action.as_str();
988 let parsed = parse_workflow_action(action_name);
989 match parsed {
990 ParsedWorkflowAction::EventEmit { event_type } => {
991 let payload = action_payload(action_spec, action_row);
992 state.event_bus.publish(EngineEvent::new(
993 event_type.clone(),
994 workflow_event_with_tenant_context(
995 json!({
996 "workflowID": workflow_id,
997 "actionID": action_row.action_id,
998 "triggerEvent": trigger_event,
999 "payload": payload,
1000 }),
1001 tenant_context,
1002 ),
1003 ));
1004 Ok(json!({ "eventType": event_type }))
1005 }
1006 ParsedWorkflowAction::ResourcePut { key } => {
1007 let record = state
1008 .put_shared_resource(
1009 key.clone(),
1010 action_payload(action_spec, action_row),
1011 None,
1012 "workflow".to_string(),
1013 None,
1014 )
1015 .await
1016 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1017 Ok(json!({ "key": record.key, "rev": record.rev }))
1018 }
1019 ParsedWorkflowAction::ResourcePatch { key } => {
1020 let current = state.get_shared_resource(&key).await;
1021 let next_rev = current.as_ref().map(|row| row.rev);
1022 let record = state
1023 .put_shared_resource(
1024 key.clone(),
1025 merge_object(
1026 current.map(|row| row.value).unwrap_or_else(|| json!({})),
1027 action_payload(action_spec, action_row),
1028 ),
1029 next_rev,
1030 "workflow".to_string(),
1031 None,
1032 )
1033 .await
1034 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1035 Ok(json!({ "key": record.key, "rev": record.rev }))
1036 }
1037 ParsedWorkflowAction::ResourceDelete { key } => {
1038 let deleted = state
1039 .delete_shared_resource(&key, None)
1040 .await
1041 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
1042 Ok(json!({ "key": key, "deleted": deleted.is_some() }))
1043 }
1044 ParsedWorkflowAction::Tool { tool_name } => {
1045 let payload = action_payload(action_spec, action_row);
1046 let result = state.tools.execute(&tool_name, payload.clone()).await?;
1047 let mut response = json!({
1048 "tool": tool_name,
1049 "output": result.output,
1050 "metadata": result.metadata,
1051 });
1052 if let Some(external_action) = record_workflow_external_action(
1053 state,
1054 run_id,
1055 workflow_id,
1056 action_row,
1057 trigger_event.clone(),
1058 WorkflowExternalActionExecution::Tool {
1059 tool_name: tool_name.clone(),
1060 },
1061 &payload,
1062 &response,
1063 )
1064 .await?
1065 {
1066 if let Some(obj) = response.as_object_mut() {
1067 obj.insert("external_action".to_string(), external_action);
1068 }
1069 }
1070 Ok(response)
1071 }
1072 ParsedWorkflowAction::Capability { capability_id } => {
1073 let bindings = state.capability_resolver.list_bindings().await?;
1074 let tool_name = bindings
1075 .bindings
1076 .iter()
1077 .find(|binding| binding.capability_id == capability_id)
1078 .map(|binding| binding.tool_name.clone())
1079 .unwrap_or_else(|| capability_id.clone());
1080 let payload = action_payload(action_spec, action_row);
1081 let result = state.tools.execute(&tool_name, payload.clone()).await?;
1082 let mut response = json!({
1083 "capability": capability_id,
1084 "tool": tool_name,
1085 "output": result.output,
1086 "metadata": result.metadata,
1087 });
1088 if let Some(external_action) = record_workflow_external_action(
1089 state,
1090 run_id,
1091 workflow_id,
1092 action_row,
1093 trigger_event.clone(),
1094 WorkflowExternalActionExecution::Capability {
1095 capability_id: capability_id.clone(),
1096 tool_name: tool_name.clone(),
1097 },
1098 &payload,
1099 &response,
1100 )
1101 .await?
1102 {
1103 if let Some(obj) = response.as_object_mut() {
1104 obj.insert("external_action".to_string(), external_action);
1105 }
1106 }
1107 Ok(response)
1108 }
1109 ParsedWorkflowAction::Workflow { workflow_id } => {
1110 anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1111 }
1112 ParsedWorkflowAction::Agent { agent_id } => {
1113 let workspace_root = state.workspace_index.snapshot().await.root;
1114 let session = Session::new(
1115 Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1116 Some(workspace_root.clone()),
1117 );
1118 let session_id = session.id.clone();
1119 state.storage.save_session(session).await?;
1120 let prompt = action_spec
1121 .with
1122 .as_ref()
1123 .and_then(|v| v.get("prompt"))
1124 .and_then(|v| v.as_str())
1125 .map(ToString::to_string)
1126 .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1127 let request = SendMessageRequest {
1128 parts: vec![MessagePartInput::Text { text: prompt }],
1129 model: None,
1130 agent: Some(agent_id.clone()),
1131 tool_mode: None,
1132 tool_allowlist: None,
1133 strict_kb_grounding: None,
1134 context_mode: None,
1135 write_required: None,
1136 prewrite_requirements: None,
1137 };
1138 state
1139 .engine_loop
1140 .run_prompt_async_with_context(
1141 session_id.clone(),
1142 request,
1143 Some(format!("workflow:{workflow_id}")),
1144 )
1145 .await?;
1146 Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1147 }
1148 }
1149}
1150
1151enum WorkflowExternalActionExecution {
1152 Tool {
1153 tool_name: String,
1154 },
1155 Capability {
1156 capability_id: String,
1157 tool_name: String,
1158 },
1159}
1160
1161async fn record_workflow_external_action(
1162 state: &AppState,
1163 run_id: &str,
1164 workflow_id: &str,
1165 action_row: &WorkflowActionRunRecord,
1166 trigger_event: Option<String>,
1167 execution: WorkflowExternalActionExecution,
1168 payload: &Value,
1169 result: &Value,
1170) -> anyhow::Result<Option<Value>> {
1171 let bindings = state.capability_resolver.list_bindings().await?;
1172 let binding = match execution {
1173 WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1174 .bindings
1175 .iter()
1176 .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1177 WorkflowExternalActionExecution::Capability {
1178 ref capability_id,
1179 ref tool_name,
1180 } => bindings.bindings.iter().find(|binding| {
1181 binding.capability_id == *capability_id
1182 && workflow_binding_matches_tool_name(binding, tool_name)
1183 }),
1184 };
1185 let Some(binding) = binding else {
1186 return Ok(None);
1187 };
1188
1189 let target = workflow_external_action_target(payload, result);
1190 let source_id = format!("{run_id}:{}", action_row.action_id);
1191 let idempotency_key = crate::sha256_hex(&[
1192 workflow_id,
1193 run_id,
1194 &action_row.action_id,
1195 &action_row.action,
1196 &payload.to_string(),
1197 ]);
1198 let action = crate::ExternalActionRecord {
1199 action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1200 operation: binding.capability_id.clone(),
1201 status: "posted".to_string(),
1202 source_kind: Some("workflow".to_string()),
1203 source_id: Some(source_id.clone()),
1204 routine_run_id: None,
1205 context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1206 capability_id: Some(binding.capability_id.clone()),
1207 provider: Some(binding.provider.clone()),
1208 target,
1209 approval_state: Some("executed".to_string()),
1210 idempotency_key: Some(idempotency_key),
1211 receipt: Some(result.clone()),
1212 error: None,
1213 metadata: Some(json!({
1214 "workflowID": workflow_id,
1215 "workflowRunID": run_id,
1216 "actionID": action_row.action_id,
1217 "action": action_row.action,
1218 "taskID": action_row.task_id,
1219 "triggerEvent": trigger_event,
1220 "tool": binding.tool_name,
1221 "provider": binding.provider,
1222 "input": payload,
1223 })),
1224 created_at_ms: action_row.updated_at_ms,
1225 updated_at_ms: action_row.updated_at_ms,
1226 };
1227 let recorded = state.record_external_action(action).await?;
1228 Ok(Some(serde_json::to_value(&recorded)?))
1229}
1230
1231fn workflow_binding_matches_tool_name(
1232 binding: &crate::capability_resolver::CapabilityBinding,
1233 tool_name: &str,
1234) -> bool {
1235 binding.tool_name.eq_ignore_ascii_case(tool_name)
1236 || binding
1237 .tool_name_aliases
1238 .iter()
1239 .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1240}
1241
1242fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1243 for candidate in [
1244 payload.pointer("/owner_repo").and_then(Value::as_str),
1245 payload.pointer("/repo").and_then(Value::as_str),
1246 payload.pointer("/repository").and_then(Value::as_str),
1247 payload.pointer("/channel").and_then(Value::as_str),
1248 payload.pointer("/channel_id").and_then(Value::as_str),
1249 payload.pointer("/thread_ts").and_then(Value::as_str),
1250 result.pointer("/metadata/channel").and_then(Value::as_str),
1251 result.pointer("/metadata/repo").and_then(Value::as_str),
1252 ] {
1253 let trimmed = candidate.map(str::trim).unwrap_or_default();
1254 if !trimmed.is_empty() {
1255 return Some(trimmed.to_string());
1256 }
1257 }
1258 None
1259}
1260
1261fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1262 action_spec
1263 .with
1264 .clone()
1265 .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1266}
1267
1268fn merge_object(current: Value, patch: Value) -> Value {
1269 if let (Some(mut current_obj), Some(patch_obj)) =
1270 (current.as_object().cloned(), patch.as_object())
1271 {
1272 for (key, value) in patch_obj {
1273 current_obj.insert(key.clone(), value.clone());
1274 }
1275 Value::Object(current_obj)
1276 } else {
1277 patch
1278 }
1279}
1280
1281fn source_event_id(event: &EngineEvent) -> String {
1282 if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1283 return id.to_string();
1284 }
1285 for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1286 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1287 return format!("{}:{id}", event.event_type);
1288 }
1289 }
1290 format!("{}:{}", event.event_type, event.properties)
1291}
1292
1293fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1294 for key in ["task_id", "taskID", "step_id", "stepID"] {
1295 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1296 let trimmed = id.trim();
1297 if !trimmed.is_empty() {
1298 return Some(trimmed.to_string());
1299 }
1300 }
1301 }
1302 None
1303}
1304
1305fn event_name_matches(expected: &str, actual: &str) -> bool {
1306 expected.trim().eq_ignore_ascii_case(actual.trim())
1307}