1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session};
4use tandem_workflows::{
5 WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6 WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13struct PreparedWorkflowAction {
14 action_id: String,
15 spec: WorkflowActionSpec,
16}
17
18fn workflow_action_objective(action: &str, with: Option<&Value>) -> String {
19 match with {
20 Some(with) if !with.is_null() => {
21 format!("Execute workflow action `{action}` with payload {with}.")
22 }
23 _ => format!("Execute workflow action `{action}`."),
24 }
25}
26
27fn workflow_manual_schedule() -> crate::AutomationV2Schedule {
28 crate::AutomationV2Schedule {
29 schedule_type: crate::AutomationV2ScheduleType::Manual,
30 cron_expression: None,
31 interval_seconds: None,
32 timezone: "UTC".to_string(),
33 misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
34 }
35}
36
37fn workflow_execution_plan(
38 workflow_id: &str,
39 name: &str,
40 description: Option<String>,
41 actions: &[PreparedWorkflowAction],
42 source_label: &str,
43 source: Option<&WorkflowSourceRef>,
44 trigger_event: Option<&str>,
45) -> crate::WorkflowPlan {
46 crate::WorkflowPlan {
47 plan_id: format!("workflow-plan-{workflow_id}"),
48 planner_version: "workflow_runtime_v1".to_string(),
49 plan_source: source_label.to_string(),
50 original_prompt: description.clone().unwrap_or_else(|| name.to_string()),
51 normalized_prompt: description.clone().unwrap_or_else(|| name.to_string()),
52 confidence: "high".to_string(),
53 title: name.to_string(),
54 description,
55 schedule: workflow_manual_schedule(),
56 execution_target: "automation_v2".to_string(),
57 workspace_root: std::env::current_dir()
58 .unwrap_or_else(|_| std::path::PathBuf::from("."))
59 .to_string_lossy()
60 .to_string(),
61 steps: actions
62 .iter()
63 .map(|action| crate::WorkflowPlanStep {
64 step_id: action.action_id.clone(),
65 kind: "workflow_action".to_string(),
66 objective: workflow_action_objective(
67 &action.spec.action,
68 action.spec.with.as_ref(),
69 ),
70 depends_on: Vec::new(),
71 agent_role: "operator".to_string(),
72 input_refs: Vec::new(),
73 output_contract: Some(crate::AutomationFlowOutputContract {
74 kind: "generic_artifact".to_string(),
75 validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
76 enforcement: None,
77 schema: None,
78 summary_guidance: None,
79 }),
80 metadata: None,
81 })
82 .collect(),
83 requires_integrations: Vec::new(),
84 allowed_mcp_servers: Vec::new(),
85 operator_preferences: Some(json!({
86 "source": source_label,
87 "tool_access_mode": "auto",
88 })),
89 save_options: json!({
90 "origin": source_label,
91 "workflow_source": source,
92 "trigger_event": trigger_event,
93 }),
94 }
95}
96
97pub(crate) fn compile_workflow_spec_to_automation_preview(
98 workflow: &WorkflowSpec,
99) -> crate::AutomationV2Spec {
100 let actions = workflow
101 .steps
102 .iter()
103 .map(|step| PreparedWorkflowAction {
104 action_id: step.step_id.clone(),
105 spec: WorkflowActionSpec {
106 action: step.action.clone(),
107 with: step.with.clone(),
108 },
109 })
110 .collect::<Vec<_>>();
111 let mut automation = crate::http::compile_plan_to_automation_v2(
112 &workflow_execution_plan(
113 &workflow.workflow_id,
114 &workflow.name,
115 workflow.description.clone(),
116 &actions,
117 "workflow_registry",
118 workflow.source.as_ref(),
119 None,
120 ),
121 None,
122 "workflow_registry",
123 );
124 if let Some(metadata) = automation.metadata.as_mut().and_then(Value::as_object_mut) {
125 metadata.insert("workflow_id".to_string(), json!(workflow.workflow_id));
126 metadata.insert("workflow_name".to_string(), json!(workflow.name));
127 metadata.insert("workflow_source".to_string(), json!(workflow.source));
128 metadata.insert("workflow_enabled".to_string(), json!(workflow.enabled));
129 }
130 automation
131}
132
133fn compile_workflow_run_automation(
134 workflow_id: &str,
135 workflow_name: Option<&str>,
136 workflow_description: Option<&str>,
137 binding_id: Option<&str>,
138 actions: &[PreparedWorkflowAction],
139 source: Option<&WorkflowSourceRef>,
140 trigger_event: Option<&str>,
141) -> crate::AutomationV2Spec {
142 let automation_id = binding_id
143 .map(|binding| format!("workflow-hook-automation-{workflow_id}-{binding}"))
144 .unwrap_or_else(|| format!("workflow-automation-{workflow_id}"));
145 let title = binding_id
146 .map(|binding| {
147 workflow_name
148 .map(|name| format!("{name} hook {binding}"))
149 .unwrap_or_else(|| format!("Workflow Hook {workflow_id}:{binding}"))
150 })
151 .unwrap_or_else(|| {
152 workflow_name
153 .map(|name| format!("{name} execution"))
154 .unwrap_or_else(|| format!("Workflow {workflow_id}"))
155 });
156 let mut automation = crate::http::compile_plan_to_automation_v2(
157 &workflow_execution_plan(
158 &automation_id,
159 &title,
160 Some(
161 workflow_description
162 .map(|description| description.to_string())
163 .unwrap_or_else(|| format!("Mirrored workflow execution for `{workflow_id}`.")),
164 ),
165 actions,
166 "workflow_runtime",
167 source,
168 trigger_event,
169 ),
170 None,
171 "workflow_runtime",
172 );
173 automation.automation_id = automation_id.clone();
174 automation.name = title;
175 automation.metadata = Some(json!({
176 "workflow_id": workflow_id,
177 "binding_id": binding_id,
178 "workflow_source": source,
179 "trigger_event": trigger_event,
180 "origin": "workflow_runtime_mirror",
181 }));
182 automation
183}
184
185async fn sync_workflow_automation_run_start(
186 state: &AppState,
187 automation: &crate::AutomationV2Spec,
188 run_id: &str,
189) -> anyhow::Result<crate::AutomationV2RunRecord> {
190 let updated = state
191 .update_automation_v2_run(run_id, |run| {
192 run.status = crate::AutomationRunStatus::Running;
193 run.started_at_ms.get_or_insert_with(now_ms);
194 crate::app::state::automation::lifecycle::record_automation_lifecycle_event(
195 run,
196 "workflow_run_started",
197 Some("workflow runtime mirror started".to_string()),
198 None,
199 );
200 crate::app::state::refresh_automation_runtime_state(automation, run);
201 })
202 .await
203 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
204 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
205 .await
206 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
207 Ok(updated)
208}
209
210async fn sync_workflow_automation_action_started(
211 state: &AppState,
212 automation: &crate::AutomationV2Spec,
213 run_id: &str,
214 action_id: &str,
215) -> anyhow::Result<crate::AutomationV2RunRecord> {
216 let updated = state
217 .update_automation_v2_run(run_id, |run| {
218 let next_attempt = run
219 .checkpoint
220 .node_attempts
221 .get(action_id)
222 .copied()
223 .unwrap_or(0)
224 .saturating_add(1);
225 run.checkpoint
226 .node_attempts
227 .insert(action_id.to_string(), next_attempt);
228 run.detail = Some(format!("Running workflow action `{action_id}`"));
229 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
230 run,
231 "workflow_action_started",
232 Some(format!("workflow action `{action_id}` started")),
233 None,
234 Some(json!({
235 "action_id": action_id,
236 "attempt": next_attempt,
237 })),
238 );
239 crate::app::state::refresh_automation_runtime_state(automation, run);
240 })
241 .await
242 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
243 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
244 .await
245 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
246 Ok(updated)
247}
248
249async fn sync_workflow_automation_action_completed(
250 state: &AppState,
251 automation: &crate::AutomationV2Spec,
252 run_id: &str,
253 action_id: &str,
254 output: &Value,
255) -> anyhow::Result<crate::AutomationV2RunRecord> {
256 let action_count = automation.flow.nodes.len();
257 let updated = state
258 .update_automation_v2_run(run_id, |run| {
259 run.checkpoint.pending_nodes.retain(|id| id != action_id);
260 if !run
261 .checkpoint
262 .completed_nodes
263 .iter()
264 .any(|id| id == action_id)
265 {
266 run.checkpoint.completed_nodes.push(action_id.to_string());
267 }
268 run.checkpoint.node_outputs.insert(
269 action_id.to_string(),
270 json!(crate::AutomationNodeOutput {
271 contract_kind: "generic_artifact".to_string(),
272 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
273 validator_summary: Some(crate::AutomationValidatorSummary {
274 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
275 outcome: "accepted".to_string(),
276 reason: Some("workflow action completed".to_string()),
277 unmet_requirements: Vec::new(),
278 warning_requirements: Vec::new(),
279 warning_count: 0,
280 accepted_candidate_source: Some("workflow_runtime".to_string()),
281 verification_outcome: Some("not_applicable".to_string()),
282 validation_basis: None,
283 repair_attempted: false,
284 repair_attempt: 0,
285 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
286 as u32,
287 repair_succeeded: false,
288 repair_exhausted: false,
289 }),
290 summary: format!("Workflow action `{action_id}` completed"),
291 content: json!({
292 "action_id": action_id,
293 "output": output,
294 }),
295 created_at_ms: now_ms(),
296 node_id: action_id.to_string(),
297 status: Some("completed".to_string()),
298 blocked_reason: None,
299 approved: None,
300 workflow_class: Some("workflow_action".to_string()),
301 phase: Some("execution".to_string()),
302 failure_kind: None,
303 tool_telemetry: None,
304 preflight: None,
305 knowledge_preflight: None,
306 capability_resolution: None,
307 attempt_evidence: None,
308 blocker_category: None,
309 fallback_used: None,
310 artifact_validation: None,
311 receipt_timeline: None,
312 quality_mode: Some("strict_research_v1".to_string()),
313 requested_quality_mode: None,
314 emergency_rollback_enabled: Some(false),
315 provenance: Some(crate::AutomationNodeOutputProvenance {
316 session_id: format!("workflow-runtime-{run_id}"),
317 node_id: action_id.to_string(),
318 run_id: Some(run_id.to_string()),
319 output_path: None,
320 content_digest: None,
321 accepted_candidate_source: Some("workflow_runtime".to_string()),
322 validation_outcome: Some("not_applicable".to_string()),
323 repair_attempt: Some(0),
324 repair_succeeded: Some(false),
325 reuse_allowed: Some(false),
326 freshness: crate::AutomationNodeOutputFreshness {
327 current_run: true,
328 current_attempt: true,
329 },
330 }),
331 }),
332 );
333 if run.checkpoint.completed_nodes.len() >= action_count {
334 run.status = crate::AutomationRunStatus::Completed;
335 run.detail = Some("workflow runtime mirror completed".to_string());
336 }
337 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
338 run,
339 "workflow_action_completed",
340 Some(format!("workflow action `{action_id}` completed")),
341 None,
342 Some(json!({
343 "action_id": action_id,
344 })),
345 );
346 crate::app::state::refresh_automation_runtime_state(automation, run);
347 })
348 .await
349 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
350 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
351 .await
352 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
353 Ok(updated)
354}
355
356async fn sync_workflow_automation_action_failed(
357 state: &AppState,
358 automation: &crate::AutomationV2Spec,
359 run_id: &str,
360 action_id: &str,
361 error: &str,
362) -> anyhow::Result<crate::AutomationV2RunRecord> {
363 let updated = state
364 .update_automation_v2_run(run_id, |run| {
365 run.status = crate::AutomationRunStatus::Failed;
366 run.detail = Some(format!("Workflow action `{action_id}` failed"));
367 run.checkpoint.pending_nodes.retain(|id| id != action_id);
368 run.checkpoint.last_failure = Some(crate::AutomationFailureRecord {
369 node_id: action_id.to_string(),
370 reason: error.to_string(),
371 failed_at_ms: now_ms(),
372 });
373 run.checkpoint.node_outputs.insert(
374 action_id.to_string(),
375 json!(crate::AutomationNodeOutput {
376 contract_kind: "generic_artifact".to_string(),
377 validator_kind: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
378 validator_summary: Some(crate::AutomationValidatorSummary {
379 kind: crate::AutomationOutputValidatorKind::GenericArtifact,
380 outcome: "rejected".to_string(),
381 reason: Some(error.to_string()),
382 unmet_requirements: vec![error.to_string()],
383 warning_requirements: Vec::new(),
384 warning_count: 0,
385 accepted_candidate_source: None,
386 verification_outcome: Some("failed".to_string()),
387 validation_basis: None,
388 repair_attempted: false,
389 repair_attempt: 0,
390 repair_attempts_remaining: tandem_core::prewrite_repair_retry_max_attempts()
391 as u32,
392 repair_succeeded: false,
393 repair_exhausted: false,
394 }),
395 summary: format!("Workflow action `{action_id}` failed"),
396 content: json!({
397 "action_id": action_id,
398 "error": error,
399 }),
400 created_at_ms: now_ms(),
401 node_id: action_id.to_string(),
402 status: Some("failed".to_string()),
403 blocked_reason: Some(error.to_string()),
404 approved: None,
405 workflow_class: Some("workflow_action".to_string()),
406 phase: Some("execution".to_string()),
407 failure_kind: Some("workflow_action_failed".to_string()),
408 tool_telemetry: None,
409 preflight: None,
410 knowledge_preflight: None,
411 capability_resolution: None,
412 attempt_evidence: None,
413 blocker_category: Some("tool_result_unusable".to_string()),
414 fallback_used: None,
415 artifact_validation: None,
416 receipt_timeline: None,
417 quality_mode: Some("strict_research_v1".to_string()),
418 requested_quality_mode: None,
419 emergency_rollback_enabled: Some(false),
420 provenance: Some(crate::AutomationNodeOutputProvenance {
421 session_id: format!("workflow-runtime-{run_id}"),
422 node_id: action_id.to_string(),
423 run_id: Some(run_id.to_string()),
424 output_path: None,
425 content_digest: None,
426 accepted_candidate_source: None,
427 validation_outcome: Some("failed".to_string()),
428 repair_attempt: Some(0),
429 repair_succeeded: Some(false),
430 reuse_allowed: Some(false),
431 freshness: crate::AutomationNodeOutputFreshness {
432 current_run: true,
433 current_attempt: true,
434 },
435 }),
436 }),
437 );
438 crate::app::state::automation::lifecycle::record_automation_lifecycle_event_with_metadata(
439 run,
440 "workflow_action_failed",
441 Some(format!("workflow action `{action_id}` failed")),
442 None,
443 Some(json!({
444 "action_id": action_id,
445 "error": error,
446 })),
447 );
448 crate::app::state::refresh_automation_runtime_state(automation, run);
449 })
450 .await
451 .ok_or_else(|| anyhow::anyhow!("automation run `{run_id}` not found"))?;
452 crate::http::context_runs::sync_automation_v2_run_blackboard(state, automation, &updated)
453 .await
454 .map_err(|status| anyhow::anyhow!("failed to sync workflow automation run: {status}"))?;
455 Ok(updated)
456}
457
458#[derive(Debug, Clone)]
459pub enum ParsedWorkflowAction {
460 EventEmit { event_type: String },
461 ResourcePut { key: String },
462 ResourcePatch { key: String },
463 ResourceDelete { key: String },
464 Tool { tool_name: String },
465 Capability { capability_id: String },
466 Workflow { workflow_id: String },
467 Agent { agent_id: String },
468}
469
470pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
471 let trimmed = action.trim();
472 if let Some(rest) = trimmed.strip_prefix("event:") {
473 return ParsedWorkflowAction::EventEmit {
474 event_type: rest.trim().to_string(),
475 };
476 }
477 if let Some(rest) = trimmed.strip_prefix("resource:put:") {
478 return ParsedWorkflowAction::ResourcePut {
479 key: rest.trim().to_string(),
480 };
481 }
482 if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
483 return ParsedWorkflowAction::ResourcePatch {
484 key: rest.trim().to_string(),
485 };
486 }
487 if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
488 return ParsedWorkflowAction::ResourceDelete {
489 key: rest.trim().to_string(),
490 };
491 }
492 if let Some(rest) = trimmed.strip_prefix("tool:") {
493 return ParsedWorkflowAction::Tool {
494 tool_name: rest.trim().to_string(),
495 };
496 }
497 if let Some(rest) = trimmed.strip_prefix("capability:") {
498 return ParsedWorkflowAction::Capability {
499 capability_id: rest.trim().to_string(),
500 };
501 }
502 if let Some(rest) = trimmed.strip_prefix("workflow:") {
503 return ParsedWorkflowAction::Workflow {
504 workflow_id: rest.trim().to_string(),
505 };
506 }
507 if let Some(rest) = trimmed.strip_prefix("agent:") {
508 return ParsedWorkflowAction::Agent {
509 agent_id: rest.trim().to_string(),
510 };
511 }
512 ParsedWorkflowAction::Capability {
513 capability_id: trimmed.to_string(),
514 }
515}
516
517pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
518 let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
519 match event.event_type.as_str() {
520 "context.task.created" => names.push("task_created".to_string()),
521 "context.task.started" => names.push("task_started".to_string()),
522 "context.task.completed" => names.push("task_completed".to_string()),
523 "context.task.failed" => names.push("task_failed".to_string()),
524 "workflow.run.started" | "routine.run.created" => {
525 names.push("workflow_started".to_string())
526 }
527 "workflow.run.completed" | "routine.run.completed" => {
528 names.push("workflow_completed".to_string())
529 }
530 "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
531 _ => {}
532 }
533 names.sort();
534 names.dedup();
535 names
536}
537
538pub async fn simulate_workflow_event(
539 state: &AppState,
540 event: &EngineEvent,
541) -> WorkflowSimulationResult {
542 let registry = state.workflow_registry().await;
543 let canonical = canonical_workflow_event_names(event);
544 let matched_bindings = registry
545 .hooks
546 .into_iter()
547 .filter(|hook| {
548 hook.enabled
549 && canonical
550 .iter()
551 .any(|name| event_name_matches(&hook.event, name))
552 })
553 .collect::<Vec<_>>();
554 let planned_actions = matched_bindings
555 .iter()
556 .flat_map(|hook| hook.actions.clone())
557 .collect::<Vec<_>>();
558 WorkflowSimulationResult {
559 matched_bindings,
560 planned_actions,
561 canonical_events: canonical,
562 }
563}
564
565pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
566 let simulation = simulate_workflow_event(state, event).await;
567 if simulation.matched_bindings.is_empty() {
568 return;
569 }
570 for hook in simulation.matched_bindings {
571 let source_event_id = source_event_id(event);
572 let task_id = task_id_from_event(event);
573 let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
574 {
575 let mut seen = state.workflow_dispatch_seen.write().await;
576 if seen.contains_key(&dedupe_key) {
577 continue;
578 }
579 seen.insert(dedupe_key, now_ms());
580 }
581 let _ = execute_hook_binding(
582 state,
583 &hook,
584 Some(event.event_type.clone()),
585 Some(source_event_id),
586 task_id,
587 false,
588 )
589 .await;
590 }
591}
592
593pub async fn run_workflow_dispatcher(state: AppState) {
594 let mut rx = state.event_bus.subscribe();
595 loop {
596 match rx.recv().await {
597 Ok(event) => dispatch_workflow_event(&state, &event).await,
598 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
599 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
600 }
601 }
602}
603
604pub async fn execute_workflow(
605 state: &AppState,
606 workflow: &WorkflowSpec,
607 trigger_event: Option<String>,
608 source_event_id: Option<String>,
609 task_id: Option<String>,
610 dry_run: bool,
611) -> anyhow::Result<WorkflowRunRecord> {
612 let actions = workflow
613 .steps
614 .iter()
615 .map(|step| PreparedWorkflowAction {
616 action_id: step.step_id.clone(),
617 spec: WorkflowActionSpec {
618 action: step.action.clone(),
619 with: step.with.clone(),
620 },
621 })
622 .collect::<Vec<_>>();
623 execute_actions(
624 state,
625 &workflow.workflow_id,
626 None,
627 actions,
628 Some(workflow.name.clone()),
629 workflow.description.clone(),
630 workflow.source.clone(),
631 trigger_event,
632 source_event_id,
633 task_id,
634 dry_run,
635 )
636 .await
637}
638
639pub async fn execute_hook_binding(
640 state: &AppState,
641 hook: &WorkflowHookBinding,
642 trigger_event: Option<String>,
643 source_event_id: Option<String>,
644 task_id: Option<String>,
645 dry_run: bool,
646) -> anyhow::Result<WorkflowRunRecord> {
647 let workflow = state
648 .get_workflow(&hook.workflow_id)
649 .await
650 .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
651 execute_actions(
652 state,
653 &hook.workflow_id,
654 Some(hook.binding_id.clone()),
655 hook.actions
656 .iter()
657 .enumerate()
658 .map(|(idx, action)| PreparedWorkflowAction {
659 action_id: format!("action_{}", idx + 1),
660 spec: action.clone(),
661 })
662 .collect(),
663 None,
664 None,
665 workflow.source,
666 trigger_event,
667 source_event_id,
668 task_id,
669 dry_run,
670 )
671 .await
672}
673
674async fn execute_actions(
675 state: &AppState,
676 workflow_id: &str,
677 binding_id: Option<String>,
678 actions: Vec<PreparedWorkflowAction>,
679 workflow_name: Option<String>,
680 workflow_description: Option<String>,
681 source: Option<WorkflowSourceRef>,
682 trigger_event: Option<String>,
683 source_event_id: Option<String>,
684 task_id: Option<String>,
685 dry_run: bool,
686) -> anyhow::Result<WorkflowRunRecord> {
687 let run_id = format!("workflow-run-{}", Uuid::new_v4());
688 let now = now_ms();
689 let automation = compile_workflow_run_automation(
690 workflow_id,
691 workflow_name.as_deref(),
692 workflow_description.as_deref(),
693 binding_id.as_deref(),
694 &actions,
695 source.as_ref(),
696 trigger_event.as_deref(),
697 );
698 let automation = state.put_automation_v2(automation).await?;
699 let automation_run = state
700 .create_automation_v2_run(&automation, trigger_event.as_deref().unwrap_or("workflow"))
701 .await?;
702 let automation_run =
703 sync_workflow_automation_run_start(state, &automation, &automation_run.run_id).await?;
704 let mut run = WorkflowRunRecord {
705 run_id: run_id.clone(),
706 workflow_id: workflow_id.to_string(),
707 automation_id: Some(automation.automation_id.clone()),
708 automation_run_id: Some(automation_run.run_id.clone()),
709 binding_id,
710 trigger_event: trigger_event.clone(),
711 source_event_id: source_event_id.clone(),
712 task_id: task_id.clone(),
713 status: if dry_run {
714 WorkflowRunStatus::DryRun
715 } else {
716 WorkflowRunStatus::Running
717 },
718 created_at_ms: now,
719 updated_at_ms: now,
720 finished_at_ms: if dry_run { Some(now) } else { None },
721 actions: actions
722 .iter()
723 .map(|action| WorkflowActionRunRecord {
724 action_id: action.action_id.clone(),
725 action: action.spec.action.clone(),
726 task_id: task_id.clone(),
727 status: if dry_run {
728 WorkflowActionRunStatus::Skipped
729 } else {
730 WorkflowActionRunStatus::Pending
731 },
732 detail: None,
733 output: None,
734 updated_at_ms: now,
735 })
736 .collect(),
737 source,
738 };
739 state.put_workflow_run(run.clone()).await?;
740 let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
741 state.event_bus.publish(EngineEvent::new(
742 "workflow.run.started",
743 json!({
744 "runID": run.run_id,
745 "workflowID": run.workflow_id,
746 "bindingID": run.binding_id,
747 "triggerEvent": trigger_event,
748 "sourceEventID": source_event_id,
749 "taskID": task_id,
750 "dryRun": dry_run,
751 }),
752 ));
753 if dry_run {
754 return Ok(run);
755 }
756 for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
757 action_row.status = WorkflowActionRunStatus::Running;
758 action_row.updated_at_ms = now_ms();
759 let action_name = action_row.action.clone();
760 let _ = sync_workflow_automation_action_started(
761 state,
762 &automation,
763 automation_run.run_id.as_str(),
764 &action_row.action_id,
765 )
766 .await;
767 state
768 .update_workflow_run(&run.run_id, |row| {
769 if let Some(target) = row
770 .actions
771 .iter_mut()
772 .find(|item| item.action_id == action_row.action_id)
773 {
774 *target = action_row.clone();
775 }
776 })
777 .await;
778 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
779 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
780 }
781 state.event_bus.publish(EngineEvent::new(
782 "workflow.action.started",
783 json!({
784 "runID": run.run_id,
785 "workflowID": run.workflow_id,
786 "actionID": action_row.action_id,
787 "action": action_name,
788 "taskID": run.task_id,
789 }),
790 ));
791 match execute_action(
792 state,
793 &run.run_id,
794 workflow_id,
795 &action_spec.spec,
796 action_row,
797 trigger_event.clone(),
798 )
799 .await
800 {
801 Ok(output) => {
802 action_row.status = WorkflowActionRunStatus::Completed;
803 action_row.output = Some(output.clone());
804 action_row.updated_at_ms = now_ms();
805 state
806 .update_workflow_run(&run.run_id, |row| {
807 if let Some(target) = row
808 .actions
809 .iter_mut()
810 .find(|item| item.action_id == action_row.action_id)
811 {
812 *target = action_row.clone();
813 }
814 })
815 .await;
816 let _ = sync_workflow_automation_action_completed(
817 state,
818 &automation,
819 automation_run.run_id.as_str(),
820 &action_row.action_id,
821 &output,
822 )
823 .await;
824 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
825 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
826 }
827 state.event_bus.publish(EngineEvent::new(
828 "workflow.action.completed",
829 json!({
830 "runID": run.run_id,
831 "workflowID": run.workflow_id,
832 "actionID": action_row.action_id,
833 "action": action_name,
834 "taskID": run.task_id,
835 "output": output,
836 }),
837 ));
838 }
839 Err(error) => {
840 let detail = error.to_string();
841 action_row.status = WorkflowActionRunStatus::Failed;
842 action_row.detail = Some(detail.clone());
843 action_row.updated_at_ms = now_ms();
844 run.status = WorkflowRunStatus::Failed;
845 run.finished_at_ms = Some(now_ms());
846 state
847 .update_workflow_run(&run.run_id, |row| {
848 row.status = WorkflowRunStatus::Failed;
849 row.finished_at_ms = Some(now_ms());
850 if let Some(target) = row
851 .actions
852 .iter_mut()
853 .find(|item| item.action_id == action_row.action_id)
854 {
855 *target = action_row.clone();
856 }
857 })
858 .await;
859 let _ = sync_workflow_automation_action_failed(
860 state,
861 &automation,
862 automation_run.run_id.as_str(),
863 &action_row.action_id,
864 &detail,
865 )
866 .await;
867 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
868 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
869 }
870 state.event_bus.publish(EngineEvent::new(
871 "workflow.action.failed",
872 json!({
873 "runID": run.run_id,
874 "workflowID": run.workflow_id,
875 "actionID": action_row.action_id,
876 "action": action_name,
877 "taskID": run.task_id,
878 "error": detail,
879 }),
880 ));
881 state.event_bus.publish(EngineEvent::new(
882 "workflow.run.failed",
883 json!({
884 "runID": run.run_id,
885 "workflowID": run.workflow_id,
886 "actionID": action_row.action_id,
887 "taskID": run.task_id,
888 "error": action_row.detail,
889 }),
890 ));
891 return state.get_workflow_run(&run.run_id).await.with_context(|| {
892 format!("workflow run `{}` missing after failure", run.run_id)
893 });
894 }
895 }
896 }
897 run.status = WorkflowRunStatus::Completed;
898 run.finished_at_ms = Some(now_ms());
899 let final_run = state
900 .update_workflow_run(&run.run_id, |row| {
901 row.status = WorkflowRunStatus::Completed;
902 row.finished_at_ms = Some(now_ms());
903 })
904 .await
905 .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
906 let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
907 state.event_bus.publish(EngineEvent::new(
908 "workflow.run.completed",
909 json!({
910 "runID": final_run.run_id,
911 "workflowID": final_run.workflow_id,
912 "bindingID": final_run.binding_id,
913 "taskID": final_run.task_id,
914 }),
915 ));
916 Ok(final_run)
917}
918
919async fn execute_action(
920 state: &AppState,
921 run_id: &str,
922 workflow_id: &str,
923 action_spec: &WorkflowActionSpec,
924 action_row: &WorkflowActionRunRecord,
925 trigger_event: Option<String>,
926) -> anyhow::Result<Value> {
927 let action_name = action_spec.action.as_str();
928 let parsed = parse_workflow_action(action_name);
929 match parsed {
930 ParsedWorkflowAction::EventEmit { event_type } => {
931 let payload = action_payload(action_spec, action_row);
932 state.event_bus.publish(EngineEvent::new(
933 event_type.clone(),
934 json!({
935 "workflowID": workflow_id,
936 "actionID": action_row.action_id,
937 "triggerEvent": trigger_event,
938 "payload": payload,
939 }),
940 ));
941 Ok(json!({ "eventType": event_type }))
942 }
943 ParsedWorkflowAction::ResourcePut { key } => {
944 let record = state
945 .put_shared_resource(
946 key.clone(),
947 action_payload(action_spec, action_row),
948 None,
949 "workflow".to_string(),
950 None,
951 )
952 .await
953 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
954 Ok(json!({ "key": record.key, "rev": record.rev }))
955 }
956 ParsedWorkflowAction::ResourcePatch { key } => {
957 let current = state.get_shared_resource(&key).await;
958 let next_rev = current.as_ref().map(|row| row.rev);
959 let record = state
960 .put_shared_resource(
961 key.clone(),
962 merge_object(
963 current.map(|row| row.value).unwrap_or_else(|| json!({})),
964 action_payload(action_spec, action_row),
965 ),
966 next_rev,
967 "workflow".to_string(),
968 None,
969 )
970 .await
971 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
972 Ok(json!({ "key": record.key, "rev": record.rev }))
973 }
974 ParsedWorkflowAction::ResourceDelete { key } => {
975 let deleted = state
976 .delete_shared_resource(&key, None)
977 .await
978 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
979 Ok(json!({ "key": key, "deleted": deleted.is_some() }))
980 }
981 ParsedWorkflowAction::Tool { tool_name } => {
982 let payload = action_payload(action_spec, action_row);
983 let result = state.tools.execute(&tool_name, payload.clone()).await?;
984 let mut response = json!({
985 "tool": tool_name,
986 "output": result.output,
987 "metadata": result.metadata,
988 });
989 if let Some(external_action) = record_workflow_external_action(
990 state,
991 run_id,
992 workflow_id,
993 action_row,
994 trigger_event.clone(),
995 WorkflowExternalActionExecution::Tool {
996 tool_name: tool_name.clone(),
997 },
998 &payload,
999 &response,
1000 )
1001 .await?
1002 {
1003 if let Some(obj) = response.as_object_mut() {
1004 obj.insert("external_action".to_string(), external_action);
1005 }
1006 }
1007 Ok(response)
1008 }
1009 ParsedWorkflowAction::Capability { capability_id } => {
1010 let bindings = state.capability_resolver.list_bindings().await?;
1011 let tool_name = bindings
1012 .bindings
1013 .iter()
1014 .find(|binding| binding.capability_id == capability_id)
1015 .map(|binding| binding.tool_name.clone())
1016 .unwrap_or_else(|| capability_id.clone());
1017 let payload = action_payload(action_spec, action_row);
1018 let result = state.tools.execute(&tool_name, payload.clone()).await?;
1019 let mut response = json!({
1020 "capability": capability_id,
1021 "tool": tool_name,
1022 "output": result.output,
1023 "metadata": result.metadata,
1024 });
1025 if let Some(external_action) = record_workflow_external_action(
1026 state,
1027 run_id,
1028 workflow_id,
1029 action_row,
1030 trigger_event.clone(),
1031 WorkflowExternalActionExecution::Capability {
1032 capability_id: capability_id.clone(),
1033 tool_name: tool_name.clone(),
1034 },
1035 &payload,
1036 &response,
1037 )
1038 .await?
1039 {
1040 if let Some(obj) = response.as_object_mut() {
1041 obj.insert("external_action".to_string(), external_action);
1042 }
1043 }
1044 Ok(response)
1045 }
1046 ParsedWorkflowAction::Workflow { workflow_id } => {
1047 anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
1048 }
1049 ParsedWorkflowAction::Agent { agent_id } => {
1050 let workspace_root = state.workspace_index.snapshot().await.root;
1051 let session = Session::new(
1052 Some(format!("Workflow {} / {}", workflow_id, agent_id)),
1053 Some(workspace_root.clone()),
1054 );
1055 let session_id = session.id.clone();
1056 state.storage.save_session(session).await?;
1057 let prompt = action_spec
1058 .with
1059 .as_ref()
1060 .and_then(|v| v.get("prompt"))
1061 .and_then(|v| v.as_str())
1062 .map(ToString::to_string)
1063 .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
1064 let request = SendMessageRequest {
1065 parts: vec![MessagePartInput::Text { text: prompt }],
1066 model: None,
1067 agent: Some(agent_id.clone()),
1068 tool_mode: None,
1069 tool_allowlist: None,
1070 context_mode: None,
1071 write_required: None,
1072 prewrite_requirements: None,
1073 };
1074 state
1075 .engine_loop
1076 .run_prompt_async_with_context(
1077 session_id.clone(),
1078 request,
1079 Some(format!("workflow:{workflow_id}")),
1080 )
1081 .await?;
1082 Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
1083 }
1084 }
1085}
1086
1087enum WorkflowExternalActionExecution {
1088 Tool {
1089 tool_name: String,
1090 },
1091 Capability {
1092 capability_id: String,
1093 tool_name: String,
1094 },
1095}
1096
1097async fn record_workflow_external_action(
1098 state: &AppState,
1099 run_id: &str,
1100 workflow_id: &str,
1101 action_row: &WorkflowActionRunRecord,
1102 trigger_event: Option<String>,
1103 execution: WorkflowExternalActionExecution,
1104 payload: &Value,
1105 result: &Value,
1106) -> anyhow::Result<Option<Value>> {
1107 let bindings = state.capability_resolver.list_bindings().await?;
1108 let binding = match execution {
1109 WorkflowExternalActionExecution::Tool { ref tool_name } => bindings
1110 .bindings
1111 .iter()
1112 .find(|binding| workflow_binding_matches_tool_name(binding, tool_name)),
1113 WorkflowExternalActionExecution::Capability {
1114 ref capability_id,
1115 ref tool_name,
1116 } => bindings.bindings.iter().find(|binding| {
1117 binding.capability_id == *capability_id
1118 && workflow_binding_matches_tool_name(binding, tool_name)
1119 }),
1120 };
1121 let Some(binding) = binding else {
1122 return Ok(None);
1123 };
1124
1125 let target = workflow_external_action_target(payload, result);
1126 let source_id = format!("{run_id}:{}", action_row.action_id);
1127 let idempotency_key = crate::sha256_hex(&[
1128 workflow_id,
1129 run_id,
1130 &action_row.action_id,
1131 &action_row.action,
1132 &payload.to_string(),
1133 ]);
1134 let action = crate::ExternalActionRecord {
1135 action_id: format!("workflow-external-{}", &idempotency_key[..16]),
1136 operation: binding.capability_id.clone(),
1137 status: "posted".to_string(),
1138 source_kind: Some("workflow".to_string()),
1139 source_id: Some(source_id.clone()),
1140 routine_run_id: None,
1141 context_run_id: Some(crate::http::context_runs::workflow_context_run_id(run_id)),
1142 capability_id: Some(binding.capability_id.clone()),
1143 provider: Some(binding.provider.clone()),
1144 target,
1145 approval_state: Some("executed".to_string()),
1146 idempotency_key: Some(idempotency_key),
1147 receipt: Some(result.clone()),
1148 error: None,
1149 metadata: Some(json!({
1150 "workflowID": workflow_id,
1151 "workflowRunID": run_id,
1152 "actionID": action_row.action_id,
1153 "action": action_row.action,
1154 "taskID": action_row.task_id,
1155 "triggerEvent": trigger_event,
1156 "tool": binding.tool_name,
1157 "provider": binding.provider,
1158 "input": payload,
1159 })),
1160 created_at_ms: action_row.updated_at_ms,
1161 updated_at_ms: action_row.updated_at_ms,
1162 };
1163 let recorded = state.record_external_action(action).await?;
1164 Ok(Some(serde_json::to_value(&recorded)?))
1165}
1166
1167fn workflow_binding_matches_tool_name(
1168 binding: &crate::capability_resolver::CapabilityBinding,
1169 tool_name: &str,
1170) -> bool {
1171 binding.tool_name.eq_ignore_ascii_case(tool_name)
1172 || binding
1173 .tool_name_aliases
1174 .iter()
1175 .any(|alias| alias.eq_ignore_ascii_case(tool_name))
1176}
1177
1178fn workflow_external_action_target(payload: &Value, result: &Value) -> Option<String> {
1179 for candidate in [
1180 payload.pointer("/owner_repo").and_then(Value::as_str),
1181 payload.pointer("/repo").and_then(Value::as_str),
1182 payload.pointer("/repository").and_then(Value::as_str),
1183 payload.pointer("/channel").and_then(Value::as_str),
1184 payload.pointer("/channel_id").and_then(Value::as_str),
1185 payload.pointer("/thread_ts").and_then(Value::as_str),
1186 result.pointer("/metadata/channel").and_then(Value::as_str),
1187 result.pointer("/metadata/repo").and_then(Value::as_str),
1188 ] {
1189 let trimmed = candidate.map(str::trim).unwrap_or_default();
1190 if !trimmed.is_empty() {
1191 return Some(trimmed.to_string());
1192 }
1193 }
1194 None
1195}
1196
1197fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
1198 action_spec
1199 .with
1200 .clone()
1201 .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
1202}
1203
1204fn merge_object(current: Value, patch: Value) -> Value {
1205 if let (Some(mut current_obj), Some(patch_obj)) =
1206 (current.as_object().cloned(), patch.as_object())
1207 {
1208 for (key, value) in patch_obj {
1209 current_obj.insert(key.clone(), value.clone());
1210 }
1211 Value::Object(current_obj)
1212 } else {
1213 patch
1214 }
1215}
1216
1217fn source_event_id(event: &EngineEvent) -> String {
1218 if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
1219 return id.to_string();
1220 }
1221 for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
1222 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1223 return format!("{}:{id}", event.event_type);
1224 }
1225 }
1226 format!("{}:{}", event.event_type, event.properties)
1227}
1228
1229fn task_id_from_event(event: &EngineEvent) -> Option<String> {
1230 for key in ["task_id", "taskID", "step_id", "stepID"] {
1231 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
1232 let trimmed = id.trim();
1233 if !trimmed.is_empty() {
1234 return Some(trimmed.to_string());
1235 }
1236 }
1237 }
1238 None
1239}
1240
1241fn event_name_matches(expected: &str, actual: &str) -> bool {
1242 expected.trim().eq_ignore_ascii_case(actual.trim())
1243}