Skip to main content

tandem_server/
workflows.rs

1use anyhow::Context;
2use serde_json::{json, Value};
3use tandem_types::{EngineEvent, MessagePartInput, SendMessageRequest, Session};
4use tandem_workflows::{
5    WorkflowActionRunRecord, WorkflowActionRunStatus, WorkflowActionSpec, WorkflowHookBinding,
6    WorkflowRunRecord, WorkflowRunStatus, WorkflowSimulationResult, WorkflowSpec,
7};
8use uuid::Uuid;
9
10use crate::{now_ms, AppState, WorkflowSourceRef};
11
12#[derive(Debug, Clone)]
13pub enum ParsedWorkflowAction {
14    EventEmit { event_type: String },
15    ResourcePut { key: String },
16    ResourcePatch { key: String },
17    ResourceDelete { key: String },
18    Tool { tool_name: String },
19    Capability { capability_id: String },
20    Workflow { workflow_id: String },
21    Agent { agent_id: String },
22}
23
24pub fn parse_workflow_action(action: &str) -> ParsedWorkflowAction {
25    let trimmed = action.trim();
26    if let Some(rest) = trimmed.strip_prefix("event:") {
27        return ParsedWorkflowAction::EventEmit {
28            event_type: rest.trim().to_string(),
29        };
30    }
31    if let Some(rest) = trimmed.strip_prefix("resource:put:") {
32        return ParsedWorkflowAction::ResourcePut {
33            key: rest.trim().to_string(),
34        };
35    }
36    if let Some(rest) = trimmed.strip_prefix("resource:patch:") {
37        return ParsedWorkflowAction::ResourcePatch {
38            key: rest.trim().to_string(),
39        };
40    }
41    if let Some(rest) = trimmed.strip_prefix("resource:delete:") {
42        return ParsedWorkflowAction::ResourceDelete {
43            key: rest.trim().to_string(),
44        };
45    }
46    if let Some(rest) = trimmed.strip_prefix("tool:") {
47        return ParsedWorkflowAction::Tool {
48            tool_name: rest.trim().to_string(),
49        };
50    }
51    if let Some(rest) = trimmed.strip_prefix("capability:") {
52        return ParsedWorkflowAction::Capability {
53            capability_id: rest.trim().to_string(),
54        };
55    }
56    if let Some(rest) = trimmed.strip_prefix("workflow:") {
57        return ParsedWorkflowAction::Workflow {
58            workflow_id: rest.trim().to_string(),
59        };
60    }
61    if let Some(rest) = trimmed.strip_prefix("agent:") {
62        return ParsedWorkflowAction::Agent {
63            agent_id: rest.trim().to_string(),
64        };
65    }
66    ParsedWorkflowAction::Capability {
67        capability_id: trimmed.to_string(),
68    }
69}
70
71pub fn canonical_workflow_event_names(event: &EngineEvent) -> Vec<String> {
72    let mut names = vec![event.event_type.clone(), event.event_type.replace('.', "_")];
73    match event.event_type.as_str() {
74        "context.task.created" => names.push("task_created".to_string()),
75        "context.task.started" => names.push("task_started".to_string()),
76        "context.task.completed" => names.push("task_completed".to_string()),
77        "context.task.failed" => names.push("task_failed".to_string()),
78        "workflow.run.started" | "routine.run.created" => {
79            names.push("workflow_started".to_string())
80        }
81        "workflow.run.completed" | "routine.run.completed" => {
82            names.push("workflow_completed".to_string())
83        }
84        "workflow.run.failed" | "routine.run.failed" => names.push("task_failed".to_string()),
85        _ => {}
86    }
87    names.sort();
88    names.dedup();
89    names
90}
91
92pub async fn simulate_workflow_event(
93    state: &AppState,
94    event: &EngineEvent,
95) -> WorkflowSimulationResult {
96    let registry = state.workflow_registry().await;
97    let canonical = canonical_workflow_event_names(event);
98    let matched_bindings = registry
99        .hooks
100        .into_iter()
101        .filter(|hook| {
102            hook.enabled
103                && canonical
104                    .iter()
105                    .any(|name| event_name_matches(&hook.event, name))
106        })
107        .collect::<Vec<_>>();
108    let planned_actions = matched_bindings
109        .iter()
110        .flat_map(|hook| hook.actions.clone())
111        .collect::<Vec<_>>();
112    WorkflowSimulationResult {
113        matched_bindings,
114        planned_actions,
115        canonical_events: canonical,
116    }
117}
118
119pub async fn dispatch_workflow_event(state: &AppState, event: &EngineEvent) {
120    let simulation = simulate_workflow_event(state, event).await;
121    if simulation.matched_bindings.is_empty() {
122        return;
123    }
124    for hook in simulation.matched_bindings {
125        let source_event_id = source_event_id(event);
126        let task_id = task_id_from_event(event);
127        let dedupe_key = format!("{}::{source_event_id}", hook.binding_id);
128        {
129            let mut seen = state.workflow_dispatch_seen.write().await;
130            if seen.contains_key(&dedupe_key) {
131                continue;
132            }
133            seen.insert(dedupe_key, now_ms());
134        }
135        let _ = execute_hook_binding(
136            state,
137            &hook,
138            Some(event.event_type.clone()),
139            Some(source_event_id),
140            task_id,
141            false,
142        )
143        .await;
144    }
145}
146
147pub async fn run_workflow_dispatcher(state: AppState) {
148    let mut rx = state.event_bus.subscribe();
149    loop {
150        match rx.recv().await {
151            Ok(event) => dispatch_workflow_event(&state, &event).await,
152            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
153            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
154        }
155    }
156}
157
158pub async fn execute_workflow(
159    state: &AppState,
160    workflow: &WorkflowSpec,
161    trigger_event: Option<String>,
162    source_event_id: Option<String>,
163    task_id: Option<String>,
164    dry_run: bool,
165) -> anyhow::Result<WorkflowRunRecord> {
166    let actions = workflow
167        .steps
168        .iter()
169        .map(|step| WorkflowActionSpec {
170            action: step.action.clone(),
171            with: step.with.clone(),
172        })
173        .collect::<Vec<_>>();
174    execute_actions(
175        state,
176        &workflow.workflow_id,
177        None,
178        actions,
179        workflow.source.clone(),
180        trigger_event,
181        source_event_id,
182        task_id,
183        dry_run,
184    )
185    .await
186}
187
188pub async fn execute_hook_binding(
189    state: &AppState,
190    hook: &WorkflowHookBinding,
191    trigger_event: Option<String>,
192    source_event_id: Option<String>,
193    task_id: Option<String>,
194    dry_run: bool,
195) -> anyhow::Result<WorkflowRunRecord> {
196    let workflow = state
197        .get_workflow(&hook.workflow_id)
198        .await
199        .with_context(|| format!("unknown workflow `{}`", hook.workflow_id))?;
200    execute_actions(
201        state,
202        &hook.workflow_id,
203        Some(hook.binding_id.clone()),
204        hook.actions.clone(),
205        workflow.source,
206        trigger_event,
207        source_event_id,
208        task_id,
209        dry_run,
210    )
211    .await
212}
213
214async fn execute_actions(
215    state: &AppState,
216    workflow_id: &str,
217    binding_id: Option<String>,
218    actions: Vec<WorkflowActionSpec>,
219    source: Option<WorkflowSourceRef>,
220    trigger_event: Option<String>,
221    source_event_id: Option<String>,
222    task_id: Option<String>,
223    dry_run: bool,
224) -> anyhow::Result<WorkflowRunRecord> {
225    let run_id = format!("workflow-run-{}", Uuid::new_v4());
226    let now = now_ms();
227    let mut run = WorkflowRunRecord {
228        run_id: run_id.clone(),
229        workflow_id: workflow_id.to_string(),
230        binding_id,
231        trigger_event: trigger_event.clone(),
232        source_event_id: source_event_id.clone(),
233        task_id: task_id.clone(),
234        status: if dry_run {
235            WorkflowRunStatus::DryRun
236        } else {
237            WorkflowRunStatus::Running
238        },
239        created_at_ms: now,
240        updated_at_ms: now,
241        finished_at_ms: if dry_run { Some(now) } else { None },
242        actions: actions
243            .iter()
244            .enumerate()
245            .map(|(idx, action)| WorkflowActionRunRecord {
246                action_id: format!("action_{}", idx + 1),
247                action: action.action.clone(),
248                task_id: task_id.clone(),
249                status: if dry_run {
250                    WorkflowActionRunStatus::Skipped
251                } else {
252                    WorkflowActionRunStatus::Pending
253                },
254                detail: None,
255                output: None,
256                updated_at_ms: now,
257            })
258            .collect(),
259        source,
260    };
261    state.put_workflow_run(run.clone()).await?;
262    let _ = crate::http::sync_workflow_run_blackboard(state, &run).await;
263    state.event_bus.publish(EngineEvent::new(
264        "workflow.run.started",
265        json!({
266            "runID": run.run_id,
267            "workflowID": run.workflow_id,
268            "bindingID": run.binding_id,
269            "triggerEvent": trigger_event,
270            "sourceEventID": source_event_id,
271            "taskID": task_id,
272            "dryRun": dry_run,
273        }),
274    ));
275    if dry_run {
276        return Ok(run);
277    }
278    for (action_row, action_spec) in run.actions.iter_mut().zip(actions.iter()) {
279        action_row.status = WorkflowActionRunStatus::Running;
280        action_row.updated_at_ms = now_ms();
281        let action_name = action_row.action.clone();
282        state
283            .update_workflow_run(&run.run_id, |row| {
284                if let Some(target) = row
285                    .actions
286                    .iter_mut()
287                    .find(|item| item.action_id == action_row.action_id)
288                {
289                    *target = action_row.clone();
290                }
291            })
292            .await;
293        if let Some(latest) = state.get_workflow_run(&run.run_id).await {
294            let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
295        }
296        state.event_bus.publish(EngineEvent::new(
297            "workflow.action.started",
298            json!({
299                "runID": run.run_id,
300                "workflowID": run.workflow_id,
301                "actionID": action_row.action_id,
302                "action": action_name,
303                "taskID": run.task_id,
304            }),
305        ));
306        match execute_action(
307            state,
308            workflow_id,
309            action_spec,
310            action_row,
311            trigger_event.clone(),
312        )
313        .await
314        {
315            Ok(output) => {
316                action_row.status = WorkflowActionRunStatus::Completed;
317                action_row.output = Some(output.clone());
318                action_row.updated_at_ms = now_ms();
319                state
320                    .update_workflow_run(&run.run_id, |row| {
321                        if let Some(target) = row
322                            .actions
323                            .iter_mut()
324                            .find(|item| item.action_id == action_row.action_id)
325                        {
326                            *target = action_row.clone();
327                        }
328                    })
329                    .await;
330                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
331                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
332                }
333                state.event_bus.publish(EngineEvent::new(
334                    "workflow.action.completed",
335                    json!({
336                        "runID": run.run_id,
337                        "workflowID": run.workflow_id,
338                        "actionID": action_row.action_id,
339                        "action": action_name,
340                        "taskID": run.task_id,
341                        "output": output,
342                    }),
343                ));
344            }
345            Err(error) => {
346                let detail = error.to_string();
347                action_row.status = WorkflowActionRunStatus::Failed;
348                action_row.detail = Some(detail.clone());
349                action_row.updated_at_ms = now_ms();
350                run.status = WorkflowRunStatus::Failed;
351                run.finished_at_ms = Some(now_ms());
352                state
353                    .update_workflow_run(&run.run_id, |row| {
354                        row.status = WorkflowRunStatus::Failed;
355                        row.finished_at_ms = Some(now_ms());
356                        if let Some(target) = row
357                            .actions
358                            .iter_mut()
359                            .find(|item| item.action_id == action_row.action_id)
360                        {
361                            *target = action_row.clone();
362                        }
363                    })
364                    .await;
365                if let Some(latest) = state.get_workflow_run(&run.run_id).await {
366                    let _ = crate::http::sync_workflow_run_blackboard(state, &latest).await;
367                }
368                state.event_bus.publish(EngineEvent::new(
369                    "workflow.action.failed",
370                    json!({
371                        "runID": run.run_id,
372                        "workflowID": run.workflow_id,
373                        "actionID": action_row.action_id,
374                        "action": action_name,
375                        "taskID": run.task_id,
376                        "error": detail,
377                    }),
378                ));
379                state.event_bus.publish(EngineEvent::new(
380                    "workflow.run.failed",
381                    json!({
382                        "runID": run.run_id,
383                        "workflowID": run.workflow_id,
384                        "actionID": action_row.action_id,
385                        "taskID": run.task_id,
386                        "error": action_row.detail,
387                    }),
388                ));
389                return state.get_workflow_run(&run.run_id).await.with_context(|| {
390                    format!("workflow run `{}` missing after failure", run.run_id)
391                });
392            }
393        }
394    }
395    run.status = WorkflowRunStatus::Completed;
396    run.finished_at_ms = Some(now_ms());
397    let final_run = state
398        .update_workflow_run(&run.run_id, |row| {
399            row.status = WorkflowRunStatus::Completed;
400            row.finished_at_ms = Some(now_ms());
401        })
402        .await
403        .with_context(|| format!("workflow run `{}` missing on completion", run.run_id))?;
404    let _ = crate::http::sync_workflow_run_blackboard(state, &final_run).await;
405    state.event_bus.publish(EngineEvent::new(
406        "workflow.run.completed",
407        json!({
408            "runID": final_run.run_id,
409            "workflowID": final_run.workflow_id,
410            "bindingID": final_run.binding_id,
411            "taskID": final_run.task_id,
412        }),
413    ));
414    Ok(final_run)
415}
416
417async fn execute_action(
418    state: &AppState,
419    workflow_id: &str,
420    action_spec: &WorkflowActionSpec,
421    action_row: &WorkflowActionRunRecord,
422    trigger_event: Option<String>,
423) -> anyhow::Result<Value> {
424    let action_name = action_spec.action.as_str();
425    let parsed = parse_workflow_action(action_name);
426    match parsed {
427        ParsedWorkflowAction::EventEmit { event_type } => {
428            let payload = action_payload(action_spec, action_row);
429            state.event_bus.publish(EngineEvent::new(
430                event_type.clone(),
431                json!({
432                    "workflowID": workflow_id,
433                    "actionID": action_row.action_id,
434                    "triggerEvent": trigger_event,
435                    "payload": payload,
436                }),
437            ));
438            Ok(json!({ "eventType": event_type }))
439        }
440        ParsedWorkflowAction::ResourcePut { key } => {
441            let record = state
442                .put_shared_resource(
443                    key.clone(),
444                    action_payload(action_spec, action_row),
445                    None,
446                    "workflow".to_string(),
447                    None,
448                )
449                .await
450                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
451            Ok(json!({ "key": record.key, "rev": record.rev }))
452        }
453        ParsedWorkflowAction::ResourcePatch { key } => {
454            let current = state.get_shared_resource(&key).await;
455            let next_rev = current.as_ref().map(|row| row.rev);
456            let record = state
457                .put_shared_resource(
458                    key.clone(),
459                    merge_object(
460                        current.map(|row| row.value).unwrap_or_else(|| json!({})),
461                        action_payload(action_spec, action_row),
462                    ),
463                    next_rev,
464                    "workflow".to_string(),
465                    None,
466                )
467                .await
468                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
469            Ok(json!({ "key": record.key, "rev": record.rev }))
470        }
471        ParsedWorkflowAction::ResourceDelete { key } => {
472            let deleted = state
473                .delete_shared_resource(&key, None)
474                .await
475                .map_err(|err| anyhow::anyhow!("{err:?}"))?;
476            Ok(json!({ "key": key, "deleted": deleted.is_some() }))
477        }
478        ParsedWorkflowAction::Tool { tool_name } => {
479            let result = state
480                .tools
481                .execute(&tool_name, action_payload(action_spec, action_row))
482                .await?;
483            Ok(json!({ "tool": tool_name, "output": result.output, "metadata": result.metadata }))
484        }
485        ParsedWorkflowAction::Capability { capability_id } => {
486            let bindings = state.capability_resolver.list_bindings().await?;
487            let tool_name = bindings
488                .bindings
489                .iter()
490                .find(|binding| binding.capability_id == capability_id)
491                .map(|binding| binding.tool_name.clone())
492                .unwrap_or_else(|| capability_id.clone());
493            let result = state
494                .tools
495                .execute(&tool_name, action_payload(action_spec, action_row))
496                .await?;
497            Ok(json!({
498                "capability": capability_id,
499                "tool": tool_name,
500                "output": result.output,
501                "metadata": result.metadata,
502            }))
503        }
504        ParsedWorkflowAction::Workflow { workflow_id } => {
505            anyhow::bail!("nested workflow action `{workflow_id}` is not supported in this slice")
506        }
507        ParsedWorkflowAction::Agent { agent_id } => {
508            let workspace_root = state.workspace_index.snapshot().await.root;
509            let session = Session::new(
510                Some(format!("Workflow {} / {}", workflow_id, agent_id)),
511                Some(workspace_root.clone()),
512            );
513            let session_id = session.id.clone();
514            state.storage.save_session(session).await?;
515            let prompt = action_spec
516                .with
517                .as_ref()
518                .and_then(|v| v.get("prompt"))
519                .and_then(|v| v.as_str())
520                .map(ToString::to_string)
521                .unwrap_or_else(|| format!("Execute workflow action `{}`", action_name));
522            let request = SendMessageRequest {
523                parts: vec![MessagePartInput::Text { text: prompt }],
524                model: None,
525                agent: Some(agent_id.clone()),
526                tool_mode: None,
527                tool_allowlist: None,
528                context_mode: None,
529                write_required: None,
530            };
531            state
532                .engine_loop
533                .run_prompt_async_with_context(
534                    session_id.clone(),
535                    request,
536                    Some(format!("workflow:{workflow_id}")),
537                )
538                .await?;
539            Ok(json!({ "agentID": agent_id, "sessionID": session_id }))
540        }
541    }
542}
543
544fn action_payload(action_spec: &WorkflowActionSpec, action_row: &WorkflowActionRunRecord) -> Value {
545    action_spec
546        .with
547        .clone()
548        .unwrap_or_else(|| json!({ "action_id": action_row.action_id }))
549}
550
551fn merge_object(current: Value, patch: Value) -> Value {
552    if let (Some(mut current_obj), Some(patch_obj)) =
553        (current.as_object().cloned(), patch.as_object())
554    {
555        for (key, value) in patch_obj {
556            current_obj.insert(key.clone(), value.clone());
557        }
558        Value::Object(current_obj)
559    } else {
560        patch
561    }
562}
563
564fn source_event_id(event: &EngineEvent) -> String {
565    if let Some(id) = event.properties.get("event_id").and_then(|v| v.as_str()) {
566        return id.to_string();
567    }
568    for key in ["runID", "runId", "task_id", "taskID", "sessionID"] {
569        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
570            return format!("{}:{id}", event.event_type);
571        }
572    }
573    format!("{}:{}", event.event_type, event.properties)
574}
575
576fn task_id_from_event(event: &EngineEvent) -> Option<String> {
577    for key in ["task_id", "taskID", "step_id", "stepID"] {
578        if let Some(id) = event.properties.get(key).and_then(|v| v.as_str()) {
579            let trimmed = id.trim();
580            if !trimmed.is_empty() {
581                return Some(trimmed.to_string());
582            }
583        }
584    }
585    None
586}
587
588fn event_name_matches(expected: &str, actual: &str) -> bool {
589    expected.trim().eq_ignore_ascii_case(actual.trim())
590}