1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session};
4use tandem_workflows::{
5 WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6 WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13struct PreparedWorkflowAction {
14 action_id: String,
15 spec: WorkflowActionSpec,
16}
17
18fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
19 match with {
20 Some(with) if !with.is_null() => {
21 format!("Execute workflow action `{action}` with payload {with}.")
22 }
23 _ => format!("Execute workflow action `{action}`."),
24 }
25}
26
27fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
28 crate::AutomationV2Schedule {
29 schedule_type: crate::AutomationV2ScheduleType::Manual,
30 cron_expression: None,
31 interval_seconds: None,
32 timezone: "UTC".to_string(),
33 misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
34 }
35}
36
37fn workflow_execution_plan(
38 workflow_id: &str,
39 name: &str,
40 description: Option<String>,
41 actions: &[PreparedWorkflowAction],
42 source_label: &str,
43 source: Option<&WorkflowSourceRef>,
44 trigger_event: Option<&str>,
45) -> crate::WorkflowPlan {
46 crate::WorkflowPlan {
47 plan_id: format!("workflow-plan-{workflow_id}"),
48 planner_version: "workflow_runtime_v1".to_string(),
49 plan_source: source_label.to_string(),
50 original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
51 normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
52 confidence: "high".to_string(),
53 title: name.to_string(),
54 description,
55 schedule: workflow_manual_schedule(),
56 execution_target: "automation_v2".to_string(),
57 workspace_root: std::env::current_dir()
58 .unwrap_or_else(|_| std::path::PathBuf::from("."))
59 .to_string_lossy()
60 .to_string(),
61 steps: actions
62 .iter()
63 .map(|action| crate::WorkflowPlanStep {
64 step_id: action.action_id.clone(),
65 kind: "workflow_action".to_string(),
66 objective: workflow_action_objective(
67 &action.spec.action,
68 action.spec.with.as_ref(),
69 ),
70 depends_on: Vec::new(),
71 agent_role: "operator".to_string(),
72 input_refs: Vec::new(),
73 output_contract: Some(crate::AutomationFlowOutputContract {
74 kind: "generic_artifact".to_string(),
75 validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
76 enforcement: None,
77 schema: None,
78 summary_guidance: None,
79 }),
80 metadata: None,
81 })
82 .collect(),
83 requires_integrations: Vec::new(),
84 allowed_mcp_servers: Vec::new(),
85 operator_preferences: Some(json!({
86 "source": source_label,
87 "tool_access_mode": "auto",
88 })),
89 save_options: json!({
90 "origin": source_label,
91 "workflow_source": source,
92 "trigger_event": trigger_event,
93 }),
94 }
95}
96
97pub(crate) fn compile_workflow_spec_to_automation_preview(
98 workflow: &WorkflowSpec,
99) -> crate::AutomationV2Spec {
100 let actions = workflow
101 .steps
102 .iter()
103 .map(|step| PreparedWorkflowAction {
104 action_id: step.step_id.clone(),
105 spec: WorkflowActionSpec {
106 action: step.action.clone(),
107 with: step.with.clone(),
108 },
109 })
110 .collect::<Vec<_>>();
111 let mut automation = crate::http::compile_plan_to_automation_v2(
112 &workflow_execution_plan(
113 &workflow.workflow_id,
114 &workflow.name,
115 workflow.description.clone(),
116 &actions,
117 "workflow_registry",
118 workflow.source.as_ref(),
119 None,
120 ),
121 None,
122 "workflow_registry",
123 );
124 if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
125 metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
126 metadata.insert("workflow_name".to_string(), json!(workflow.name));
127 metadata.insert("workflow_source".to_string(), json!(workflow.source));
128 metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
129 }
130 automation
131}
132
133fn compile_workflow_run_automation(
134 workflow_id: &str,
135 workflow_name: Option<&str>,
136 workflow_description: Option<&str>,
137 binding_id: Option<&str>,
138 actions: &[PreparedWorkflowAction],
139 source: Option<&WorkflowSourceRef>,
140 trigger_event: Option<&str>,
141) -> crate::AutomationV2Spec {
142 let automation_id = binding_id
143 .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
144 .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
145 let title = binding_id
146 .map(|binding| {
147 workflow_name
148 .map(|name| format!("{name} hook {binding}"))
149 .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
150 })
151 .unwrap_or_else(|| {
152 workflow_name
153 .map(|name| format!("{name} execution"))
154 .unwrap_or_else(|| format!("Workflow {workflow_id}"))
155 });
156 let mut automation = crate::http::compile_plan_to_automation_v2(
157 &workflow_execution_plan(
158 &automation_id,
159 &title,
160 Some(
161 workflow_description
162 .map(|description| description.to_string())
163 .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
164 ),
165 actions,
166 "workflow_runtime",
167 source,
168 trigger_event,
169 ),
170 None,
171 "workflow_runtime",
172 );
173 automation.automation_id = automation_id.clone();
174 automation.name = title;
175 automation.metadata = Some(json!({
176 "workflow_id": workflow_id,
177 "binding_id": binding_id,
178 "workflow_source": source,
179 "trigger_event": trigger_event,
180 "origin": "workflow_runtime_mirror",
181 }));
182 automation
183}
184
185async fn sync_workflow_automation_run_start(
186 state: &AppState,
187 automation: &crate::AutomationV2Spec,
188 run_id: &str,
189) -> anyhow::Result<crate::AutomationV2RunRecord> {
190 let updated = state
191 .update_automation_v2_run(run_id, |run| {
192 run.status = crate::AutomationRunStatus::Running;
193 run.started_at_ms.get_or_insert_with(now_ms);
194 crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
195 run,
196 "workflow_run_started",
197 Some("workflow runtime mirror started".to_string()),
198 None,
199 );
200 crate::app::state::refresh_automation_runtime_state(automation, run);
201 })
202 .await
203 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
204 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
205 .await
206 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
207 Ok(updated)
208}
209
210async fn sync_workflow_automation_action_started(
211 state: &AppState,
212 automation: &crate::AutomationV2Spec,
213 run_id: &str,
214 action_id: &str,
215) -> anyhow::Result<crate::AutomationV2RunRecord> {
216 let updated = state
217 .update_automation_v2_run(run_id, |run| {
218 let next_attempt = run
219 .checkpoint
220 .node_attempts
221 .get(action_id)
222 .copied()
223 .unwrap_or(0)
224 .saturating_add(1);
225 run.checkpoint
226 .node_attempts
227 .insert(action_id.to_string(), next_attempt);
228 run.detail = Some(format!("Running workflow action `{action_id}`"));
229 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
230 run,
231 "workflow_action_started",
232 Some(format!("workflow action `{action_id}` started")),
233 None,
234 Some(json!({
235 "action_id": action_id,
236 "attempt": next_attempt,
237 })),
238 );
239 crate::app::state::refresh_automation_runtime_state(automation, run);
240 })
241 .await
242 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
243 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
244 .await
245 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
246 Ok(updated)
247}
248
249async fn sync_workflow_automation_action_completed(
250 state: &AppState,
251 automation: &crate::AutomationV2Spec,
252 run_id: &str,
253 action_id: &str,
254 output: &Value,
255) -> anyhow::Result<crate::AutomationV2RunRecord> {
256 let action_count = automation.flow.nodes.len();
257 let updated = state
258 .update_automation_v2_run(run_id, |run| {
259 run.checkpoint.pending_nodes.retain(|id| id != action_id);
260 if !run
261 .checkpoint
262 .completed_nodes
263 .iter()
264 .any(|id| id == action_id)
265 {
266 run.checkpoint.completed_nodes.push(action_id.to_string());
267 }
268 run.checkpoint.node_outputs.insert(
269 action_id.to_string(),
270 json!(crate::AutomationNodeOutput {
271 contract_kind: "generic_artifact".to_string(),
272 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
273 validator_summary: Some(crate::AutomationValidatorSummary {
274 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
275 outcome: "accepted".to_string(),
276 reason: Some("workflow action completed".to_string()),
277 unmet_requirements: Vec::new(),
278 warning_requirements: Vec::new(),
279 warning_count: 0,
280 accepted_candidate_source: Some("workflow_runtime".to_string()),
281 verification_outcome: Some("not_applicable".to_string()),
282 validation_basis: None,
283 repair_attempted: false,
284 repair_attempt: 0,
285 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
286 as u32,
287 repair_succeeded: false,
288 repair_exhausted: false,
289 }),
290 summary: format!("Workflow action `{action_id}` completed"),
291 content: json!({
292 "action_id": action_id,
293 "output": output,
294 }),
295 created_at_ms: now_ms(),
296 node_id: action_id.to_string(),
297 status: Some("completed".to_string()),
298 blocked_reason: None,
299 approved: None,
300 workflow_class: Some("workflow_action".to_string()),
301 phase: Some("execution".to_string()),
302 failure_kind: None,
303 tool_telemetry: None,
304 preflight: None,
305 capability_resolution: None,
306 attempt_evidence: None,
307 blocker_category: None,
308 fallback_used: None,
309 artifact_validation: None,
310 receipt_timeline: None,
311 quality_mode: Some("strict_research_v1".to_string()),
312 requested_quality_mode: None,
313 emergency_rollback_enabled: Some(false),
314 provenance: Some(crate::AutomationNodeOutputProvenance {
315 session_id: format!("workflow-runtime-{run_id}"),
316 node_id: action_id.to_string(),
317 run_id: Some(run_id.to_string()),
318 output_path: None,
319 content_digest: None,
320 accepted_candidate_source: Some("workflow_runtime".to_string()),
321 validation_outcome: Some("not_applicable".to_string()),
322 repair_attempt: Some(0),
323 repair_succeeded: Some(false),
324 reuse_allowed: Some(false),
325 freshness: crate::AutomationNodeOutputFreshness {
326 current_run: true,
327 current_attempt: true,
328 },
329 }),
330 }),
331 );
332 if run.checkpoint.completed_nodes.len() >= action_count {
333 run.status = crate::AutomationRunStatus::Completed;
334 run.detail = Some("workflow runtime mirror completed".to_string());
335 }
336 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
337 run,
338 "workflow_action_completed",
339 Some(format!("workflow action `{action_id}` completed")),
340 None,
341 Some(json!({
342 "action_id": action_id,
343 })),
344 );
345 crate::app::state::refresh_automation_runtime_state(automation, run);
346 })
347 .await
348 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
349 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
350 .await
351 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
352 Ok(updated)
353}
354
355async fn sync_workflow_automation_action_failed(
356 state: &AppState,
357 automation: &crate::AutomationV2Spec,
358 run_id: &str,
359 action_id: &str,
360 error: &str,
361) -> anyhow::Result<crate::AutomationV2RunRecord> {
362 let updated = state
363 .update_automation_v2_run(run_id, |run| {
364 run.status = crate::AutomationRunStatus::Failed;
365 run.detail = Some(format!("Workflow action `{action_id}` failed"));
366 run.checkpoint.pending_nodes.retain(|id| id != action_id);
367 run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
368 node_id: action_id.to_string(),
369 reason: error.to_string(),
370 failed_at_ms: now_ms(),
371 });
372 run.checkpoint.node_outputs.insert(
373 action_id.to_string(),
374 json!(crate::AutomationNodeOutput {
375 contract_kind: "generic_artifact".to_string(),
376 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
377 validator_summary: Some(crate::AutomationValidatorSummary {
378 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
379 outcome: "rejected".to_string(),
380 reason: Some(error.to_string()),
381 unmet_requirements: vec![error.to_string()],
382 warning_requirements: Vec::new(),
383 warning_count: 0,
384 accepted_candidate_source: None,
385 verification_outcome: Some("failed".to_string()),
386 validation_basis: None,
387 repair_attempted: false,
388 repair_attempt: 0,
389 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
390 as u32,
391 repair_succeeded: false,
392 repair_exhausted: false,
393 }),
394 summary: format!("Workflow action `{action_id}` failed"),
395 content: json!({
396 "action_id": action_id,
397 "error": error,
398 }),
399 created_at_ms: now_ms(),
400 node_id: action_id.to_string(),
401 status: Some("failed".to_string()),
402 blocked_reason: Some(error.to_string()),
403 approved: None,
404 workflow_class: Some("workflow_action".to_string()),
405 phase: Some("execution".to_string()),
406 failure_kind: Some("workflow_action_failed".to_string()),
407 tool_telemetry: None,
408 preflight: None,
409 capability_resolution: None,
410 attempt_evidence: None,
411 blocker_category: Some("tool_result_unusable".to_string()),
412 fallback_used: None,
413 artifact_validation: None,
414 receipt_timeline: None,
415 quality_mode: Some("strict_research_v1".to_string()),
416 requested_quality_mode: None,
417 emergency_rollback_enabled: Some(false),
418 provenance: Some(crate::AutomationNodeOutputProvenance {
419 session_id: format!("workflow-runtime-{run_id}"),
420 node_id: action_id.to_string(),
421 run_id: Some(run_id.to_string()),
422 output_path: None,
423 content_digest: None,
424 accepted_candidate_source: None,
425 validation_outcome: Some("failed".to_string()),
426 repair_attempt: Some(0),
427 repair_succeeded: Some(false),
428 reuse_allowed: Some(false),
429 freshness: crate::AutomationNodeOutputFreshness {
430 current_run: true,
431 current_attempt: true,
432 },
433 }),
434 }),
435 );
436 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
437 run,
438 "workflow_action_failed",
439 Some(format!("workflow action `{action_id}` failed")),
440 None,
441 Some(json!({
442 "action_id": action_id,
443 "error": error,
444 })),
445 );
446 crate::app::state::refresh_automation_runtime_state(automation, run);
447 })
448 .await
449 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
450 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
451 .await
452 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
453 Ok(updated)
454}
455
456#[derive(Debug, Clone)]
457pub enum ParsedWorkflowAction {
458 EventEmit { event_type: String },
459 ResourcePut { key: String },
460 ResourcePatch { key: String },
461 ResourceDelete { key: String },
462 Tool { tool_name: String },
463 Capability { capability_id: String },
464 Workflow { workflow_id: String },
465 Agent { agent_id: String },
466}
467
468pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
469 let trimmed = action.trim();
470 if let Some(rest) = trimmed.strip_prefix("event:") {
471 return ParsedWorkflowAction::EventEmit {
472 event_type: rest.trim().to_string(),
473 };
474 }
475 if let Some(rest) = trimmed.strip_prefix("resource:put:") {
476 return ParsedWorkflowAction::ResourcePut {
477 key: rest.trim().to_string(),
478 };
479 }
480 if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
481 return ParsedWorkflowAction::ResourcePatch {
482 key: rest.trim().to_string(),
483 };
484 }
485 if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
486 return ParsedWorkflowAction::ResourceDelete {
487 key: rest.trim().to_string(),
488 };
489 }
490 if let Some(rest) = trimmed.strip_prefix("tool:") {
491 return ParsedWorkflowAction::Tool {
492 tool_name: rest.trim().to_string(),
493 };
494 }
495 if let Some(rest) = trimmed.strip_prefix("capability:") {
496 return ParsedWorkflowAction::Capability {
497 capability_id: rest.trim().to_string(),
498 };
499 }
500 if let Some(rest) = trimmed.strip_prefix("workflow:") {
501 return ParsedWorkflowAction::Workflow {
502 workflow_id: rest.trim().to_string(),
503 };
504 }
505 if let Some(rest) = trimmed.strip_prefix("agent:") {
506 return ParsedWorkflowAction::Agent {
507 agent_id: rest.trim().to_string(),
508 };
509 }
510 ParsedWorkflowAction::Capability {
511 capability_id: trimmed.to_string(),
512 }
513}
514
515pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
516 let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
517 match event.event_type.as_str() {
518 "context.task.created" => names.push("task_created".to_string()),
519 "context.task.started" => names.push("task_started".to_string()),
520 "context.task.completed" => names.push("task_completed".to_string()),
521 "context.task.failed" => names.push("task_failed".to_string()),
522 "workflow.run.started" | "routine.run.created" => {
523 names.push("workflow_started".to_string())
524 }
525 "workflow.run.completed" | "routine.run.completed" => {
526 names.push("workflow_completed".to_string())
527 }
528 "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
529 _ => {}
530 }
531 names.sort();
532 names.dedup();
533 names
534}
535
536pub async fn simulate_workflow_event(
537 state: &AppState,
538 event: &EngineEvent,
539) -> WorkflowSimulationResult {
540 let registry = state.workflow_registry().await;
541 let canonical = canonical_workflow_event_names(event);
542 let matched_bindings = registry
543 .hooks
544 .into_iter()
545 .filter(|hook| {
546 hook.enabled
547 && canonical
548 .iter()
549 .any(|name| event_name_matches(&hook.event, name))
550 })
551 .collect::<Vec<_>>();
552 let planned_actions = matched_bindings
553 .iter()
554 .flat_map(|hook| hook.actions.clone())
555 .collect::<Vec<_>>();
556 WorkflowSimulationResult {
557 matched_bindings,
558 planned_actions,
559 canonical_events: canonical,
560 }
561}
562
563pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
564 let simulation = simulate_workflow_event(state, event).await;
565 if simulation.matched_bindings.is_empty() {
566 return;
567 }
568 for hook in simulation.matched_bindings {
569 let source_event_id = source_event_id(event);
570 let task_id = task_id_from_event(event);
571 let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
572 {
573 let mut seen = state.workflow_dispatch_seen.write().await;
574 if seen.contains_key(&dedupe_key) {
575 continue;
576 }
577 seen.insert(dedupe_key, now_ms());
578 }
579 let _ = execute_hook_binding(
580 state,
581 &hook,
582 Some(event.event_type.clone()),
583 Some(source_event_id),
584 task_id,
585 false,
586 )
587 .await;
588 }
589}
590
591pub async fn run_workflow_dispatcher(state: AppState) {
592 let mut rx = state.event_bus.subscribe();
593 loop {
594 match rx.recv().await {
595 Ok(event) => dispatch_workflow_event(&state, &event).await,
596 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
597 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
598 }
599 }
600}
601
602pub async fn execute_workflow(
603 state: &AppState,
604 workflow: &WorkflowSpec,
605 trigger_event: Option<String>,
606 source_event_id: Option<String>,
607 task_id: Option<String>,
608 dry_run: bool,
609) -> anyhow::Result<WorkflowRunRecord> {
610 let actions = workflow
611 .steps
612 .iter()
613 .map(|step| PreparedWorkflowAction {
614 action_id: step.step_id.clone(),
615 spec: WorkflowActionSpec {
616 action: step.action.clone(),
617 with: step.with.clone(),
618 },
619 })
620 .collect::<Vec<_>>();
621 execute_actions(
622 state,
623 &workflow.workflow_id,
624 None,
625 actions,
626 Some(workflow.name.clone()),
627 workflow.description.clone(),
628 workflow.source.clone(),
629 trigger_event,
630 source_event_id,
631 task_id,
632 dry_run,
633 )
634 .await
635}
636
637pub async fn execute_hook_binding(
638 state: &AppState,
639 hook: &WorkflowHookBinding,
640 trigger_event: Option<String>,
641 source_event_id: Option<String>,
642 task_id: Option<String>,
643 dry_run: bool,
644) -> anyhow::Result<WorkflowRunRecord> {
645 let workflow = state
646 .get_workflow(&hook.workflow_id)
647 .await
648 .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
649 execute_actions(
650 state,
651 &hook.workflow_id,
652 Some(hook.binding_id.clone()),
653 hook.actions
654 .iter()
655 .enumerate()
656 .map(|(idx, action)| PreparedWorkflowAction {
657 action_id: format!("action_{}", idx + 1),
658 spec: action.clone(),
659 })
660 .collect(),
661 None,
662 None,
663 workflow.source,
664 trigger_event,
665 source_event_id,
666 task_id,
667 dry_run,
668 )
669 .await
670}
671
672async fn execute_actions(
673 state: &AppState,
674 workflow_id: &str,
675 binding_id: Option<String>,
676 actions: Vec<PreparedWorkflowAction>,
677 workflow_name: Option<String>,
678 workflow_description: Option<String>,
679 source: Option<WorkflowSourceRef>,
680 trigger_event: Option<String>,
681 source_event_id: Option<String>,
682 task_id: Option<String>,
683 dry_run: bool,
684) -> anyhow::Result<WorkflowRunRecord> {
685 let run_id = format!("workflow-run-{}", Uuid::new_v4());
686 let now = now_ms();
687 let automation = compile_workflow_run_automation(
688 workflow_id,
689 workflow_name.as_deref(),
690 workflow_description.as_deref(),
691 binding_id.as_deref(),
692 &actions,
693 source.as_ref(),
694 trigger_event.as_deref(),
695 );
696 let automation = state.put_automation_v2(automation).await?;
697 let automation_run = state
698 .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
699 .await?;
700 let automation_run =
701 sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
702 let mut run = WorkflowRunRecord {
703 run_id: run_id.clone(),
704 workflow_id: workflow_id.to_string(),
705 automation_id: Some(automation.automation_id.clone()),
706 automation_run_id: Some(automation_run.run_id.clone()),
707 binding_id,
708 trigger_event: trigger_event.clone(),
709 source_event_id: source_event_id.clone(),
710 task_id: task_id.clone(),
711 status: if dry_run {
712 WorkflowRunStatus::DryRun
713 } else {
714 WorkflowRunStatus::Running
715 },
716 created_at_ms: now,
717 updated_at_ms: now,
718 finished_at_ms: if dry_run { Some(now) } else { None },
719 actions: actions
720 .iter()
721 .map(|action| WorkflowActionRunRecord {
722 action_id: action.action_id.clone(),
723 action: action.spec.action.clone(),
724 task_id: task_id.clone(),
725 status: if dry_run {
726 WorkflowActionRunStatus::Skipped
727 } else {
728 WorkflowActionRunStatus::Pending
729 },
730 detail: None,
731 output: None,
732 updated_at_ms: now,
733 })
734 .collect(),
735 source,
736 };
737 state.put_workflow_run(run.clone()).await?;
738 let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
739 state.event_bus.publish(EngineEvent::new(
740 "workflow.run.started",
741 json!({
742 "runID": run.run_id,
743 "workflowID": run.workflow_id,
744 "bindingID": run.binding_id,
745 "triggerEvent": trigger_event,
746 "sourceEventID": source_event_id,
747 "taskID": task_id,
748 "dryRun": dry_run,
749 }),
750 ));
751 if dry_run {
752 return Ok(run);
753 }
754 for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
755 action_row.status = WorkflowActionRunStatus::Running;
756 action_row.updated_at_ms = now_ms();
757 let action_name = action_row.action.clone();
758 let _ = sync_workflow_automation_action_started(
759 state,
760 &automation,
761 automation_run.run_id.as_str(),
762 &action_row.action_id,
763 )
764 .await;
765 state
766 .update_workflow_run(&run.run_id, |row| {
767 if let Some(target) = row
768 .actions
769 .iter_mut()
770 .find(|item| item.action_id == action_row.action_id)
771 {
772 *target = action_row.clone();
773 }
774 })
775 .await;
776 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
777 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
778 }
779 state.event_bus.publish(EngineEvent::new(
780 "workflow.action.started",
781 json!({
782 "runID": run.run_id,
783 "workflowID": run.workflow_id,
784 "actionID": action_row.action_id,
785 "action": action_name,
786 "taskID": run.task_id,
787 }),
788 ));
789 match execute_action(
790 state,
791 &run.run_id,
792 workflow_id,
793 &action_spec.spec,
794 action_row,
795 trigger_event.clone(),
796 )
797 .await
798 {
799 Ok(output) => {
800 action_row.status = WorkflowActionRunStatus::Completed;
801 action_row.output = Some(output.clone());
802 action_row.updated_at_ms = now_ms();
803 state
804 .update_workflow_run(&run.run_id, |row| {
805 if let Some(target) = row
806 .actions
807 .iter_mut()
808 .find(|item| item.action_id == action_row.action_id)
809 {
810 *target = action_row.clone();
811 }
812 })
813 .await;
814 let _ = sync_workflow_automation_action_completed(
815 state,
816 &automation,
817 automation_run.run_id.as_str(),
818 &action_row.action_id,
819 &output,
820 )
821 .await;
822 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
823 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
824 }
825 state.event_bus.publish(EngineEvent::new(
826 "workflow.action.completed",
827 json!({
828 "runID": run.run_id,
829 "workflowID": run.workflow_id,
830 "actionID": action_row.action_id,
831 "action": action_name,
832 "taskID": run.task_id,
833 "output": output,
834 }),
835 ));
836 }
837 Err(error) => {
838 let detail = error.to_string();
839 action_row.status = WorkflowActionRunStatus::Failed;
840 action_row.detail = Some(detail.clone());
841 action_row.updated_at_ms = now_ms();
842 run.status = WorkflowRunStatus::Failed;
843 run.finished_at_ms = Some(now_ms());
844 state
845 .update_workflow_run(&run.run_id, |row| {
846 row.status = WorkflowRunStatus::Failed;
847 row.finished_at_ms = Some(now_ms());
848 if let Some(target) = row
849 .actions
850 .iter_mut()
851 .find(|item| item.action_id == action_row.action_id)
852 {
853 *target = action_row.clone();
854 }
855 })
856 .await;
857 let _ = sync_workflow_automation_action_failed(
858 state,
859 &automation,
860 automation_run.run_id.as_str(),
861 &action_row.action_id,
862 &detail,
863 )
864 .await;
865 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
866 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
867 }
868 state.event_bus.publish(EngineEvent::new(
869 "workflow.action.failed",
870 json!({
871 "runID": run.run_id,
872 "workflowID": run.workflow_id,
873 "actionID": action_row.action_id,
874 "action": action_name,
875 "taskID": run.task_id,
876 "error": detail,
877 }),
878 ));
879 state.event_bus.publish(EngineEvent::new(
880 "workflow.run.failed",
881 json!({
882 "runID": run.run_id,
883 "workflowID": run.workflow_id,
884 "actionID": action_row.action_id,
885 "taskID": run.task_id,
886 "error": action_row.detail,
887 }),
888 ));
889 return state.get_workflow_run(&run.run_id).await.with_context(|| {
890 format!("workflow run `{}` missing after failure", run.run_id)
891 });
892 }
893 }
894 }
895 run.status = WorkflowRunStatus::Completed;
896 run.finished_at_ms = Some(now_ms());
897 let final_run = state
898 .update_workflow_run(&run.run_id, |row| {
899 row.status = WorkflowRunStatus::Completed;
900 row.finished_at_ms = Some(now_ms());
901 })
902 .await
903 .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
904 let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
905 state.event_bus.publish(EngineEvent::new(
906 "workflow.run.completed",
907 json!({
908 "runID": final_run.run_id,
909 "workflowID": final_run.workflow_id,
910 "bindingID": final_run.binding_id,
911 "taskID": final_run.task_id,
912 }),
913 ));
914 Ok(final_run)
915}
916
917async fn execute_action(
918 state: &AppState,
919 run_id: &str,
920 workflow_id: &str,
921 action_spec: &WorkflowActionSpec,
922 action_row: &WorkflowActionRunRecord,
923 trigger_event: Option<String>,
924) -> anyhow::Result<Value> {
925 let action_name = action_spec.action.as_str();
926 let parsed = parse_workflow_action(action_name);
927 match parsed {
928 ParsedWorkflowAction::EventEmit { event_type } => {
929 let payload = action_payload(action_spec, action_row);
930 state.event_bus.publish(EngineEvent::new(
931 event_type.clone(),
932 json!({
933 "workflowID": workflow_id,
934 "actionID": action_row.action_id,
935 "triggerEvent": trigger_event,
936 "payload": payload,
937 }),
938 ));
939 Ok(json!({ "eventType": event_type }))
940 }
941 ParsedWorkflowAction::ResourcePut { key } => {
942 let record = state
943 .put_shared_resource(
944 key.clone(),
945 action_payload(action_spec, action_row),
946 None,
947 "workflow".to_string(),
948 None,
949 )
950 .await
951 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
952 Ok(json!({ "key": record.key, "rev": record.rev }))
953 }
954 ParsedWorkflowAction::ResourcePatch { key } => {
955 let current = state.get_shared_resource(&key).await;
956 let next_rev = current.as_ref().map(|row| row.rev);
957 let record = state
958 .put_shared_resource(
959 key.clone(),
960 merge_object(
961 current.map(|row| row.value).unwrap_or_else(|| json!({})),
962 action_payload(action_spec, action_row),
963 ),
964 next_rev,
965 "workflow".to_string(),
966 None,
967 )
968 .await
969 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
970 Ok(json!({ "key": record.key, "rev": record.rev }))
971 }
972 ParsedWorkflowAction::ResourceDelete { key } => {
973 let deleted = state
974 .delete_shared_resource(&key, None)
975 .await
976 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
977 Ok(json!({ "key": key, "deleted": deleted.is_some() }))
978 }
979 ParsedWorkflowAction::Tool { tool_name } => {
980 let payload = action_payload(action_spec, action_row);
981 let result = state.tools.execute(&tool_name, payload.clone()).await?;
982 let mut response = json!({
983 "tool": tool_name,
984 "output": result.output,
985 "metadata": result.metadata,
986 });
987 if let Some(external_action) = record_workflow_external_action(
988 state,
989 run_id,
990 workflow_id,
991 action_row,
992 trigger_event.clone(),
993 WorkflowExternalActionExecution::Tool {
994 tool_name: tool_name.clone(),
995 },
996 &payload,
997 &response,
998 )
999 .await?
1000 {
1001 if let Some(obj) = response.as_object_mut() {
1002 obj.insert("external_action".to_string(), external_action);
1003 }
1004 }
1005 Ok(response)
1006 }
1007 ParsedWorkflowAction::Capability { capability_id } => {
1008 let bindings = state.capability_resolver.list_bindings().await?;
1009 let tool_name = bindings
1010 .bindings
1011 .iter()
1012 .find(|binding| binding.capability_id == capability_id)
1013 .map(|binding| binding.tool_name.clone())
1014 .unwrap_or_else(|| capability_id.clone());
1015 let payload = action_payload(action_spec, action_row);
1016 let result = state.tools.execute(&tool_name, payload.clone()).await?;
1017 let mut response = json!({
1018 "capability": capability_id,
1019 "tool": tool_name,
1020 "output": result.output,
1021 "metadata": result.metadata,
1022 });
1023 if let Some(external_action) = record_workflow_external_action(
1024 state,
1025 run_id,
1026 workflow_id,
1027 action_row,
1028 trigger_event.clone(),
1029 WorkflowExternalActionExecution::Capability {
1030 capability_id: capability_id.clone(),
1031 tool_name: tool_name.clone(),
1032 },
1033 &payload,
1034 &response,
1035 )
1036 .await?
1037 {
1038 if let Some(obj) = response.as_object_mut() {
1039 obj.insert("external_action".to_string(), external_action);
1040 }
1041 }
1042 Ok(response)
1043 }
1044 ParsedWorkflowAction::Workflow { workflow_id } => {
1045 anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1046 }
1047 ParsedWorkflowAction::Agent { agent_id } => {
1048 let workspace_root = state.workspace_index.snapshot().await.root;
1049 let session = Session::new(
1050 Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1051 Some(workspace_root.clone()),
1052 );
1053 let session_id = session.id.clone();
1054 state.storage.save_session(session).await?;
1055 let prompt = action_spec
1056 .with
1057 .as_ref()
1058 .and_then(|v| v.get("prompt"))
1059 .and_then(|v| v.as_str())
1060 .map(ToString::to_string)
1061 .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1062 let request = SendMessageRequest {
1063 parts: vec![MessagePartInput::Text { text: prompt }],
1064 model: None,
1065 agent: Some(agent_id.clone()),
1066 tool_mode: None,
1067 tool_allowlist: None,
1068 context_mode: None,
1069 write_required: None,
1070 prewrite_requirements: None,
1071 };
1072 state
1073 .engine_loop
1074 .run_prompt_async_with_context(
1075 session_id.clone(),
1076 request,
1077 Some(format!("workflow:{workflow_id}")),
1078 )
1079 .await?;
1080 Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1081 }
1082 }
1083}
1084
1085enum WorkflowExternalActionExecution {
1086 Tool {
1087 tool_name: String,
1088 },
1089 Capability {
1090 capability_id: String,
1091 tool_name: String,
1092 },
1093}
1094
1095async fn record_workflow_external_action(
1096 state: &AppState,
1097 run_id: &str,
1098 workflow_id: &str,
1099 action_row: &WorkflowActionRunRecord,
1100 trigger_event: Option<String>,
1101 execution: WorkflowExternalActionExecution,
1102 payload: &Value,
1103 result: &Value,
1104) -> anyhow::Result<Option<Value>> {
1105 let bindings = state.capability_resolver.list_bindings().await?;
1106 let binding = match execution {
1107 WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1108 .bindings
1109 .iter()
1110 .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1111 WorkflowExternalActionExecution::Capability {
1112 ref capability_id,
1113 ref tool_name,
1114 } => bindings.bindings.iter().find(|binding| {
1115 binding.capability_id == *capability_id
1116 && workflow_binding_matches_tool_name(binding, tool_name)
1117 }),
1118 };
1119 let Some(binding) = binding else {
1120 return Ok(None);
1121 };
1122
1123 let target = workflow_external_action_target(payload, result);
1124 let source_id = format!("{run_id}:{}", action_row.action_id);
1125 let idempotency_key = crate::sha256_hex(&[
1126 workflow_id,
1127 run_id,
1128 &action_row.action_id,
1129 &action_row.action,
1130 &payload.to_string(),
1131 ]);
1132 let action = crate::ExternalActionRecord {
1133 action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1134 operation: binding.capability_id.clone(),
1135 status: "posted".to_string(),
1136 source_kind: Some("workflow".to_string()),
1137 source_id: Some(source_id.clone()),
1138 routine_run_id: None,
1139 context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1140 capability_id: Some(binding.capability_id.clone()),
1141 provider: Some(binding.provider.clone()),
1142 target,
1143 approval_state: Some("executed".to_string()),
1144 idempotency_key: Some(idempotency_key),
1145 receipt: Some(result.clone()),
1146 error: None,
1147 metadata: Some(json!({
1148 "workflowID": workflow_id,
1149 "workflowRunID": run_id,
1150 "actionID": action_row.action_id,
1151 "action": action_row.action,
1152 "taskID": action_row.task_id,
1153 "triggerEvent": trigger_event,
1154 "tool": binding.tool_name,
1155 "provider": binding.provider,
1156 "input": payload,
1157 })),
1158 created_at_ms: action_row.updated_at_ms,
1159 updated_at_ms: action_row.updated_at_ms,
1160 };
1161 let recorded = state.record_external_action(action).await?;
1162 Ok(Some(serde_json::to_value(&recorded)?))
1163}
1164
1165fn workflow_binding_matches_tool_name(
1166 binding: &crate::capability_resolver::CapabilityBinding,
1167 tool_name: &str,
1168) -> bool {
1169 binding.tool_name.eq_ignore_ascii_case(tool_name)
1170 || binding
1171 .tool_name_aliases
1172 .iter()
1173 .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1174}
1175
1176fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1177 for candidate in [
1178 payload.pointer("/owner_repo").and_then(Value::as_str),
1179 payload.pointer("/repo").and_then(Value::as_str),
1180 payload.pointer("/repository").and_then(Value::as_str),
1181 payload.pointer("/channel").and_then(Value::as_str),
1182 payload.pointer("/channel_id").and_then(Value::as_str),
1183 payload.pointer("/thread_ts").and_then(Value::as_str),
1184 result.pointer("/metadata/channel").and_then(Value::as_str),
1185 result.pointer("/metadata/repo").and_then(Value::as_str),
1186 ] {
1187 let trimmed = candidate.map(str::trim).unwrap_or_default();
1188 if !trimmed.is_empty() {
1189 return Some(trimmed.to_string());
1190 }
1191 }
1192 None
1193}
1194
1195fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1196 action_spec
1197 .with
1198 .clone()
1199 .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1200}
1201
1202fn merge_object(current: Value, patch: Value) -> Value {
1203 if let (Some(mut current_obj), Some(patch_obj)) =
1204 (current.as_object().cloned(), patch.as_object())
1205 {
1206 for (key, value) in patch_obj {
1207 current_obj.insert(key.clone(), value.clone());
1208 }
1209 Value::Object(current_obj)
1210 } else {
1211 patch
1212 }
1213}
1214
1215fn source_event_id(event: &EngineEvent) -> String {
1216 if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1217 return id.to_string();
1218 }
1219 for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1220 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1221 return format!("{}:{id}", event.event_type);
1222 }
1223 }
1224 format!("{}:{}", event.event_type, event.properties)
1225}
1226
1227fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1228 for key in ["task_id", "taskID", "step_id", "stepID"] {
1229 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1230 let trimmed = id.trim();
1231 if !trimmed.is_empty() {
1232 return Some(trimmed.to_string());
1233 }
1234 }
1235 }
1236 None
1237}
1238
1239fn event_name_matches(expected: &str, actual: &str) -> bool {
1240 expected.trim().eq_ignore_ascii_case(actual.trim())
1241}