1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session};
4use tandem_workflows::{
5 WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6 WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13pub enum ParsedWorkflowAction {
14 EventEmit { event_type: String },
15 ResourcePut { key: String },
16 ResourcePatch { key: String },
17 ResourceDelete { key: String },
18 Tool { tool_name: String },
19 Capability { capability_id: String },
20 Workflow { workflow_id: String },
21 Agent { agent_id: String },
22}
23
24pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
25 let trimmed = action.trim();
26 if let Some(rest) = trimmed.strip_prefix("event:") {
27 return ParsedWorkflowAction::EventEmit {
28 event_type: rest.trim().to_string(),
29 };
30 }
31 if let Some(rest) = trimmed.strip_prefix("resource:put:") {
32 return ParsedWorkflowAction::ResourcePut {
33 key: rest.trim().to_string(),
34 };
35 }
36 if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
37 return ParsedWorkflowAction::ResourcePatch {
38 key: rest.trim().to_string(),
39 };
40 }
41 if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
42 return ParsedWorkflowAction::ResourceDelete {
43 key: rest.trim().to_string(),
44 };
45 }
46 if let Some(rest) = trimmed.strip_prefix("tool:") {
47 return ParsedWorkflowAction::Tool {
48 tool_name: rest.trim().to_string(),
49 };
50 }
51 if let Some(rest) = trimmed.strip_prefix("capability:") {
52 return ParsedWorkflowAction::Capability {
53 capability_id: rest.trim().to_string(),
54 };
55 }
56 if let Some(rest) = trimmed.strip_prefix("workflow:") {
57 return ParsedWorkflowAction::Workflow {
58 workflow_id: rest.trim().to_string(),
59 };
60 }
61 if let Some(rest) = trimmed.strip_prefix("agent:") {
62 return ParsedWorkflowAction::Agent {
63 agent_id: rest.trim().to_string(),
64 };
65 }
66 ParsedWorkflowAction::Capability {
67 capability_id: trimmed.to_string(),
68 }
69}
70
71pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
72 let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
73 match event.event_type.as_str() {
74 "context.task.created" => names.push("task_created".to_string()),
75 "context.task.started" => names.push("task_started".to_string()),
76 "context.task.completed" => names.push("task_completed".to_string()),
77 "context.task.failed" => names.push("task_failed".to_string()),
78 "workflow.run.started" | "routine.run.created" => {
79 names.push("workflow_started".to_string())
80 }
81 "workflow.run.completed" | "routine.run.completed" => {
82 names.push("workflow_completed".to_string())
83 }
84 "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
85 _ => {}
86 }
87 names.sort();
88 names.dedup();
89 names
90}
91
92pub async fn simulate_workflow_event(
93 state: &AppState,
94 event: &EngineEvent,
95) -> WorkflowSimulationResult {
96 let registry = state.workflow_registry().await;
97 let canonical = canonical_workflow_event_names(event);
98 let matched_bindings = registry
99 .hooks
100 .into_iter()
101 .filter(|hook| {
102 hook.enabled
103 && canonical
104 .iter()
105 .any(|name| event_name_matches(&hook.event, name))
106 })
107 .collect::<Vec<_>>();
108 let planned_actions = matched_bindings
109 .iter()
110 .flat_map(|hook| hook.actions.clone())
111 .collect::<Vec<_>>();
112 WorkflowSimulationResult {
113 matched_bindings,
114 planned_actions,
115 canonical_events: canonical,
116 }
117}
118
119pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
120 let simulation = simulate_workflow_event(state, event).await;
121 if simulation.matched_bindings.is_empty() {
122 return;
123 }
124 for hook in simulation.matched_bindings {
125 let source_event_id = source_event_id(event);
126 let task_id = task_id_from_event(event);
127 let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
128 {
129 let mut seen = state.workflow_dispatch_seen.write().await;
130 if seen.contains_key(&dedupe_key) {
131 continue;
132 }
133 seen.insert(dedupe_key, now_ms());
134 }
135 let _ = execute_hook_binding(
136 state,
137 &hook,
138 Some(event.event_type.clone()),
139 Some(source_event_id),
140 task_id,
141 false,
142 )
143 .await;
144 }
145}
146
147pub async fn run_workflow_dispatcher(state: AppState) {
148 let mut rx = state.event_bus.subscribe();
149 loop {
150 match rx.recv().await {
151 Ok(event) => dispatch_workflow_event(&state, &event).await,
152 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
153 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
154 }
155 }
156}
157
158pub async fn execute_workflow(
159 state: &AppState,
160 workflow: &WorkflowSpec,
161 trigger_event: Option<String>,
162 source_event_id: Option<String>,
163 task_id: Option<String>,
164 dry_run: bool,
165) -> anyhow::Result<WorkflowRunRecord> {
166 let actions = workflow
167 .steps
168 .iter()
169 .map(|step| WorkflowActionSpec {
170 action: step.action.clone(),
171 with: step.with.clone(),
172 })
173 .collect::<Vec<_>>();
174 execute_actions(
175 state,
176 &workflow.workflow_id,
177 None,
178 actions,
179 workflow.source.clone(),
180 trigger_event,
181 source_event_id,
182 task_id,
183 dry_run,
184 )
185 .await
186}
187
188pub async fn execute_hook_binding(
189 state: &AppState,
190 hook: &WorkflowHookBinding,
191 trigger_event: Option<String>,
192 source_event_id: Option<String>,
193 task_id: Option<String>,
194 dry_run: bool,
195) -> anyhow::Result<WorkflowRunRecord> {
196 let workflow = state
197 .get_workflow(&hook.workflow_id)
198 .await
199 .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
200 execute_actions(
201 state,
202 &hook.workflow_id,
203 Some(hook.binding_id.clone()),
204 hook.actions.clone(),
205 workflow.source,
206 trigger_event,
207 source_event_id,
208 task_id,
209 dry_run,
210 )
211 .await
212}
213
214async fn execute_actions(
215 state: &AppState,
216 workflow_id: &str,
217 binding_id: Option<String>,
218 actions: Vec<WorkflowActionSpec>,
219 source: Option<WorkflowSourceRef>,
220 trigger_event: Option<String>,
221 source_event_id: Option<String>,
222 task_id: Option<String>,
223 dry_run: bool,
224) -> anyhow::Result<WorkflowRunRecord> {
225 let run_id = format!("workflow-run-{}", Uuid::new_v4());
226 let now = now_ms();
227 let mut run = WorkflowRunRecord {
228 run_id: run_id.clone(),
229 workflow_id: workflow_id.to_string(),
230 binding_id,
231 trigger_event: trigger_event.clone(),
232 source_event_id: source_event_id.clone(),
233 task_id: task_id.clone(),
234 status: if dry_run {
235 WorkflowRunStatus::DryRun
236 } else {
237 WorkflowRunStatus::Running
238 },
239 created_at_ms: now,
240 updated_at_ms: now,
241 finished_at_ms: if dry_run { Some(now) } else { None },
242 actions: actions
243 .iter()
244 .enumerate()
245 .map(|(idx, action)| WorkflowActionRunRecord {
246 action_id: format!("action_{}", idx + 1),
247 action: action.action.clone(),
248 task_id: task_id.clone(),
249 status: if dry_run {
250 WorkflowActionRunStatus::Skipped
251 } else {
252 WorkflowActionRunStatus::Pending
253 },
254 detail: None,
255 output: None,
256 updated_at_ms: now,
257 })
258 .collect(),
259 source,
260 };
261 state.put_workflow_run(run.clone()).await?;
262 let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
263 state.event_bus.publish(EngineEvent::new(
264 "workflow.run.started",
265 json!({
266 "runID": run.run_id,
267 "workflowID": run.workflow_id,
268 "bindingID": run.binding_id,
269 "triggerEvent": trigger_event,
270 "sourceEventID": source_event_id,
271 "taskID": task_id,
272 "dryRun": dry_run,
273 }),
274 ));
275 if dry_run {
276 return Ok(run);
277 }
278 for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
279 action_row.status = WorkflowActionRunStatus::Running;
280 action_row.updated_at_ms = now_ms();
281 let action_name = action_row.action.clone();
282 state
283 .update_workflow_run(&run.run_id, |row| {
284 if let Some(target) = row
285 .actions
286 .iter_mut()
287 .find(|item| item.action_id == action_row.action_id)
288 {
289 *target = action_row.clone();
290 }
291 })
292 .await;
293 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
294 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
295 }
296 state.event_bus.publish(EngineEvent::new(
297 "workflow.action.started",
298 json!({
299 "runID": run.run_id,
300 "workflowID": run.workflow_id,
301 "actionID": action_row.action_id,
302 "action": action_name,
303 "taskID": run.task_id,
304 }),
305 ));
306 match execute_action(
307 state,
308 workflow_id,
309 action_spec,
310 action_row,
311 trigger_event.clone(),
312 )
313 .await
314 {
315 Ok(output) => {
316 action_row.status = WorkflowActionRunStatus::Completed;
317 action_row.output = Some(output.clone());
318 action_row.updated_at_ms = now_ms();
319 state
320 .update_workflow_run(&run.run_id, |row| {
321 if let Some(target) = row
322 .actions
323 .iter_mut()
324 .find(|item| item.action_id == action_row.action_id)
325 {
326 *target = action_row.clone();
327 }
328 })
329 .await;
330 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
331 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
332 }
333 state.event_bus.publish(EngineEvent::new(
334 "workflow.action.completed",
335 json!({
336 "runID": run.run_id,
337 "workflowID": run.workflow_id,
338 "actionID": action_row.action_id,
339 "action": action_name,
340 "taskID": run.task_id,
341 "output": output,
342 }),
343 ));
344 }
345 Err(error) => {
346 let detail = error.to_string();
347 action_row.status = WorkflowActionRunStatus::Failed;
348 action_row.detail = Some(detail.clone());
349 action_row.updated_at_ms = now_ms();
350 run.status = WorkflowRunStatus::Failed;
351 run.finished_at_ms = Some(now_ms());
352 state
353 .update_workflow_run(&run.run_id, |row| {
354 row.status = WorkflowRunStatus::Failed;
355 row.finished_at_ms = Some(now_ms());
356 if let Some(target) = row
357 .actions
358 .iter_mut()
359 .find(|item| item.action_id == action_row.action_id)
360 {
361 *target = action_row.clone();
362 }
363 })
364 .await;
365 if let Some(latest) = state.get_workflow_run(&run.run_id).await {
366 let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
367 }
368 state.event_bus.publish(EngineEvent::new(
369 "workflow.action.failed",
370 json!({
371 "runID": run.run_id,
372 "workflowID": run.workflow_id,
373 "actionID": action_row.action_id,
374 "action": action_name,
375 "taskID": run.task_id,
376 "error": detail,
377 }),
378 ));
379 state.event_bus.publish(EngineEvent::new(
380 "workflow.run.failed",
381 json!({
382 "runID": run.run_id,
383 "workflowID": run.workflow_id,
384 "actionID": action_row.action_id,
385 "taskID": run.task_id,
386 "error": action_row.detail,
387 }),
388 ));
389 return state.get_workflow_run(&run.run_id).await.with_context(|| {
390 format!("workflow run `{}` missing after failure", run.run_id)
391 });
392 }
393 }
394 }
395 run.status = WorkflowRunStatus::Completed;
396 run.finished_at_ms = Some(now_ms());
397 let final_run = state
398 .update_workflow_run(&run.run_id, |row| {
399 row.status = WorkflowRunStatus::Completed;
400 row.finished_at_ms = Some(now_ms());
401 })
402 .await
403 .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
404 let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
405 state.event_bus.publish(EngineEvent::new(
406 "workflow.run.completed",
407 json!({
408 "runID": final_run.run_id,
409 "workflowID": final_run.workflow_id,
410 "bindingID": final_run.binding_id,
411 "taskID": final_run.task_id,
412 }),
413 ));
414 Ok(final_run)
415}
416
417async fn execute_action(
418 state: &AppState,
419 workflow_id: &str,
420 action_spec: &WorkflowActionSpec,
421 action_row: &WorkflowActionRunRecord,
422 trigger_event: Option<String>,
423) -> anyhow::Result<Value> {
424 let action_name = action_spec.action.as_str();
425 let parsed = parse_workflow_action(action_name);
426 match parsed {
427 ParsedWorkflowAction::EventEmit { event_type } => {
428 let payload = action_payload(action_spec, action_row);
429 state.event_bus.publish(EngineEvent::new(
430 event_type.clone(),
431 json!({
432 "workflowID": workflow_id,
433 "actionID": action_row.action_id,
434 "triggerEvent": trigger_event,
435 "payload": payload,
436 }),
437 ));
438 Ok(json!({ "eventType": event_type }))
439 }
440 ParsedWorkflowAction::ResourcePut { key } => {
441 let record = state
442 .put_shared_resource(
443 key.clone(),
444 action_payload(action_spec, action_row),
445 None,
446 "workflow".to_string(),
447 None,
448 )
449 .await
450 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
451 Ok(json!({ "key": record.key, "rev": record.rev }))
452 }
453 ParsedWorkflowAction::ResourcePatch { key } => {
454 let current = state.get_shared_resource(&key).await;
455 let next_rev = current.as_ref().map(|row| row.rev);
456 let record = state
457 .put_shared_resource(
458 key.clone(),
459 merge_object(
460 current.map(|row| row.value).unwrap_or_else(|| json!({})),
461 action_payload(action_spec, action_row),
462 ),
463 next_rev,
464 "workflow".to_string(),
465 None,
466 )
467 .await
468 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
469 Ok(json!({ "key": record.key, "rev": record.rev }))
470 }
471 ParsedWorkflowAction::ResourceDelete { key } => {
472 let deleted = state
473 .delete_shared_resource(&key, None)
474 .await
475 .map_err(|err| anyhow::anyhow!("{err:?}"))?;
476 Ok(json!({ "key": key, "deleted": deleted.is_some() }))
477 }
478 ParsedWorkflowAction::Tool { tool_name } => {
479 let result = state
480 .tools
481 .execute(&tool_name, action_payload(action_spec, action_row))
482 .await?;
483 Ok(json!({ "tool": tool_name, "output": result.output, "metadata": result.metadata }))
484 }
485 ParsedWorkflowAction::Capability { capability_id } => {
486 let bindings = state.capability_resolver.list_bindings().await?;
487 let tool_name = bindings
488 .bindings
489 .iter()
490 .find(|binding| binding.capability_id == capability_id)
491 .map(|binding| binding.tool_name.clone())
492 .unwrap_or_else(|| capability_id.clone());
493 let result = state
494 .tools
495 .execute(&tool_name, action_payload(action_spec, action_row))
496 .await?;
497 Ok(json!({
498 "capability": capability_id,
499 "tool": tool_name,
500 "output": result.output,
501 "metadata": result.metadata,
502 }))
503 }
504 ParsedWorkflowAction::Workflow { workflow_id } => {
505 anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
506 }
507 ParsedWorkflowAction::Agent { agent_id } => {
508 let workspace_root = state.workspace_index.snapshot().await.root;
509 let session = Session::new(
510 Some(format!("Workflow {} / {}", workflow_id, agent_id)),
511 Some(workspace_root.clone()),
512 );
513 let session_id = session.id.clone();
514 state.storage.save_session(session).await?;
515 let prompt = action_spec
516 .with
517 .as_ref()
518 .and_then(|v| v.get("prompt"))
519 .and_then(|v| v.as_str())
520 .map(ToString::to_string)
521 .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
522 let request = SendMessageRequest {
523 parts: vec![MessagePartInput::Text { text: prompt }],
524 model: None,
525 agent: Some(agent_id.clone()),
526 tool_mode: None,
527 tool_allowlist: None,
528 context_mode: None,
529 write_required: None,
530 };
531 state
532 .engine_loop
533 .run_prompt_async_with_context(
534 session_id.clone(),
535 request,
536 Some(format!("workflow:{workflow_id}")),
537 )
538 .await?;
539 Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
540 }
541 }
542}
543
544fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
545 action_spec
546 .with
547 .clone()
548 .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
549}
550
551fn merge_object(current: Value, patch: Value) -> Value {
552 if let (Some(mut current_obj), Some(patch_obj)) =
553 (current.as_object().cloned(), patch.as_object())
554 {
555 for (key, value) in patch_obj {
556 current_obj.insert(key.clone(), value.clone());
557 }
558 Value::Object(current_obj)
559 } else {
560 patch
561 }
562}
563
564fn source_event_id(event: &EngineEvent) -> String {
565 if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
566 return id.to_string();
567 }
568 for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
569 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
570 return format!("{}:{id}", event.event_type);
571 }
572 }
573 format!("{}:{}", event.event_type, event.properties)
574}
575
576fn task_id_from_event(event: &EngineEvent) -> Option<String> {
577 for key in ["task_id", "taskID", "step_id", "stepID"] {
578 if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
579 let trimmed = id.trim();
580 if !trimmed.is_empty() {
581 return Some(trimmed.to_string());
582 }
583 }
584 }
585 None
586}
587
588fn event_name_matches(expected: &str, actual: &str) -> bool {
589 expected.trim().eq_ignore_ascii_case(actual.trim())
590}