Skip to main content

tandem_workflows/
lib.rs

1use anyhow::Context;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8use tandem_orchestrator::KnowledgeBinding;
9use tandem_types::TenantContext;
10
11mod mission_builder;
12pub mod plan_package;
13
14pub use mission_builder::{
15    validate_mission_blueprint, ApprovalDecision, HumanApprovalGate, InputRefBlueprint,
16    MissionBlueprint, MissionMilestoneBlueprint, MissionPhaseBlueprint, MissionPhaseExecutionMode,
17    MissionTeamBlueprint, OutputContractBlueprint, ReviewStage, ReviewStageKind, ValidationMessage,
18    ValidationSeverity, WorkstreamBlueprint,
19};
20pub use plan_package::{
21    AutomationV2Schedule, AutomationV2ScheduleType, WorkflowPlan, WorkflowPlanChatMessage,
22    WorkflowPlanConversation, WorkflowPlanDraftRecord, WorkflowPlanStep,
23};
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26#[serde(rename_all = "snake_case")]
27pub enum WorkflowSourceKind {
28    BuiltIn,
29    Pack,
30    Workspace,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
34pub struct WorkflowSourceRef {
35    pub kind: WorkflowSourceKind,
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub pack_id: Option<String>,
38    #[serde(default, skip_serializing_if = "Option::is_none")]
39    pub path: Option<String>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct WorkflowActionSpec {
44    pub action: String,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub with: Option<Value>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub struct WorkflowStepSpec {
51    pub step_id: String,
52    pub action: String,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub with: Option<Value>,
55    #[serde(default)]
56    pub knowledge: KnowledgeBinding,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
60pub struct WorkflowHookBinding {
61    pub binding_id: String,
62    pub workflow_id: String,
63    pub event: String,
64    #[serde(default = "default_true")]
65    pub enabled: bool,
66    #[serde(default)]
67    pub actions: Vec<WorkflowActionSpec>,
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub source: Option<WorkflowSourceRef>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
73pub struct WorkflowSpec {
74    pub workflow_id: String,
75    pub name: String,
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub description: Option<String>,
78    #[serde(default = "default_true")]
79    pub enabled: bool,
80    #[serde(default)]
81    pub knowledge: KnowledgeBinding,
82    #[serde(default)]
83    pub steps: Vec<WorkflowStepSpec>,
84    #[serde(default)]
85    pub hooks: Vec<WorkflowHookBinding>,
86    #[serde(default, skip_serializing_if = "Option::is_none")]
87    pub source: Option<WorkflowSourceRef>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
91pub struct WorkflowRegistry {
92    #[serde(default)]
93    pub workflows: HashMap<String, WorkflowSpec>,
94    #[serde(default)]
95    pub hooks: Vec<WorkflowHookBinding>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99#[serde(rename_all = "snake_case")]
100pub enum WorkflowRunStatus {
101    Queued,
102    Running,
103    Completed,
104    Failed,
105    DryRun,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
109#[serde(rename_all = "snake_case")]
110pub enum WorkflowActionRunStatus {
111    Pending,
112    Running,
113    Completed,
114    Failed,
115    Skipped,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub struct WorkflowActionRunRecord {
120    pub action_id: String,
121    pub action: String,
122    #[serde(default, skip_serializing_if = "Option::is_none")]
123    pub task_id: Option<String>,
124    pub status: WorkflowActionRunStatus,
125    #[serde(default, skip_serializing_if = "Option::is_none")]
126    pub detail: Option<String>,
127    #[serde(default, skip_serializing_if = "Option::is_none")]
128    pub output: Option<Value>,
129    pub updated_at_ms: u64,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
133pub struct WorkflowRunRecord {
134    pub run_id: String,
135    pub workflow_id: String,
136    #[serde(default = "default_tenant_context")]
137    pub tenant_context: TenantContext,
138    #[serde(default, skip_serializing_if = "Option::is_none")]
139    pub automation_id: Option<String>,
140    #[serde(default, skip_serializing_if = "Option::is_none")]
141    pub automation_run_id: Option<String>,
142    #[serde(default, skip_serializing_if = "Option::is_none")]
143    pub binding_id: Option<String>,
144    #[serde(default, skip_serializing_if = "Option::is_none")]
145    pub trigger_event: Option<String>,
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    pub source_event_id: Option<String>,
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub task_id: Option<String>,
150    pub status: WorkflowRunStatus,
151    pub created_at_ms: u64,
152    pub updated_at_ms: u64,
153    #[serde(default, skip_serializing_if = "Option::is_none")]
154    pub finished_at_ms: Option<u64>,
155    #[serde(default)]
156    pub actions: Vec<WorkflowActionRunRecord>,
157    #[serde(default, skip_serializing_if = "Option::is_none")]
158    pub source: Option<WorkflowSourceRef>,
159}
160
161fn default_tenant_context() -> TenantContext {
162    TenantContext::local_implicit()
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166pub struct WorkflowSimulationResult {
167    #[serde(default)]
168    pub matched_bindings: Vec<WorkflowHookBinding>,
169    #[serde(default)]
170    pub planned_actions: Vec<WorkflowActionSpec>,
171    #[serde(default)]
172    pub canonical_events: Vec<String>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
176#[serde(rename_all = "snake_case")]
177pub enum WorkflowValidationSeverity {
178    Info,
179    Warning,
180    Error,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
184pub struct WorkflowValidationMessage {
185    pub severity: WorkflowValidationSeverity,
186    pub message: String,
187}
188
189#[derive(Debug, Clone)]
190pub struct WorkflowLoadSource {
191    pub root: PathBuf,
192    pub kind: WorkflowSourceKind,
193    pub pack_id: Option<String>,
194}
195
196#[derive(Debug, Deserialize)]
197struct WorkflowFileEnvelope {
198    #[serde(default)]
199    workflow: Option<WorkflowFileShape>,
200    #[serde(default)]
201    hooks: Option<Value>,
202}
203
204#[derive(Debug, Deserialize)]
205struct WorkflowFileShape {
206    #[serde(default)]
207    id: Option<String>,
208    #[serde(default)]
209    workflow_id: Option<String>,
210    #[serde(default)]
211    name: Option<String>,
212    #[serde(default)]
213    description: Option<String>,
214    #[serde(default)]
215    enabled: Option<bool>,
216    #[serde(default)]
217    steps: Vec<WorkflowStepInput>,
218    #[serde(default)]
219    hooks: Option<Value>,
220}
221
222#[derive(Debug, Deserialize)]
223#[serde(untagged)]
224enum WorkflowStepInput {
225    String(String),
226    Object(WorkflowStepObjectInput),
227}
228
229#[derive(Debug, Deserialize)]
230struct WorkflowStepObjectInput {
231    #[serde(default)]
232    id: Option<String>,
233    #[serde(default)]
234    step_id: Option<String>,
235    action: String,
236    #[serde(default)]
237    with: Option<Value>,
238}
239
240#[derive(Debug, Deserialize)]
241#[serde(untagged)]
242enum HookFileShape {
243    Map(HashMap<String, Vec<HookActionInput>>),
244    List(Vec<HookBindingInput>),
245}
246
247#[derive(Debug, Deserialize)]
248struct HookBindingInput {
249    #[serde(default)]
250    id: Option<String>,
251    #[serde(default)]
252    binding_id: Option<String>,
253    #[serde(default)]
254    workflow: Option<String>,
255    #[serde(default)]
256    workflow_id: Option<String>,
257    event: String,
258    #[serde(default)]
259    enabled: Option<bool>,
260    #[serde(default)]
261    actions: Vec<HookActionInput>,
262}
263
264#[derive(Debug, Clone, Deserialize)]
265#[serde(untagged)]
266enum HookActionInput {
267    String(String),
268    Object(WorkflowActionSpec),
269}
270
271pub fn load_registry(sources: &[WorkflowLoadSource]) -> anyhow::Result<WorkflowRegistry> {
272    let mut registry = WorkflowRegistry::default();
273    for source in sources {
274        load_source_into(&mut registry, source)?;
275    }
276    Ok(registry)
277}
278
279pub fn validate_registry(registry: &WorkflowRegistry) -> Vec<WorkflowValidationMessage> {
280    let mut messages = Vec::new();
281    for workflow in registry.workflows.values() {
282        if workflow.steps.is_empty()
283            && registry
284                .hooks
285                .iter()
286                .all(|hook| hook.workflow_id != workflow.workflow_id)
287        {
288            messages.push(WorkflowValidationMessage {
289                severity: WorkflowValidationSeverity::Warning,
290                message: format!(
291                    "workflow `{}` has no steps and no hook bindings",
292                    workflow.workflow_id
293                ),
294            });
295        }
296        for step in &workflow.steps {
297            if step.action.trim().is_empty() {
298                messages.push(WorkflowValidationMessage {
299                    severity: WorkflowValidationSeverity::Error,
300                    message: format!(
301                        "workflow `{}` has step `{}` with empty action",
302                        workflow.workflow_id, step.step_id
303                    ),
304                });
305            }
306        }
307    }
308    for hook in &registry.hooks {
309        if !registry.workflows.contains_key(&hook.workflow_id) {
310            messages.push(WorkflowValidationMessage {
311                severity: WorkflowValidationSeverity::Error,
312                message: format!(
313                    "hook `{}` references unknown workflow `{}`",
314                    hook.binding_id, hook.workflow_id
315                ),
316            });
317        }
318        if hook.actions.is_empty() {
319            messages.push(WorkflowValidationMessage {
320                severity: WorkflowValidationSeverity::Warning,
321                message: format!("hook `{}` has no actions", hook.binding_id),
322            });
323        }
324    }
325    messages
326}
327
328fn load_source_into(
329    registry: &mut WorkflowRegistry,
330    source: &WorkflowLoadSource,
331) -> anyhow::Result<()> {
332    for entry in collect_yaml_files(&source.root.join("workflows"))? {
333        let workflow = load_workflow_file(&entry, source)?;
334        registry
335            .workflows
336            .insert(workflow.workflow_id.clone(), workflow.clone());
337        registry.hooks.retain(|hook| {
338            hook.workflow_id != workflow.workflow_id
339                || !matches!(
340                    hook.source.as_ref(),
341                    Some(src) if src.path.as_deref() == Some(entry.to_string_lossy().as_ref())
342                )
343        });
344        registry.hooks.extend(workflow.hooks.clone());
345    }
346    for entry in collect_yaml_files(&source.root.join("hooks"))? {
347        registry.hooks.extend(load_hook_file(&entry, source)?);
348    }
349    Ok(())
350}
351
352fn collect_yaml_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
353    let mut files = Vec::new();
354    let entries = match fs::read_dir(dir) {
355        Ok(entries) => entries,
356        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(files),
357        Err(err) => return Err(err.into()),
358    };
359    for entry in entries {
360        let path = entry?.path();
361        if path.is_dir() {
362            files.extend(collect_yaml_files(&path)?);
363            continue;
364        }
365        let ext = path
366            .extension()
367            .and_then(|v| v.to_str())
368            .unwrap_or_default();
369        if matches!(ext, "yaml" | "yml") {
370            files.push(path);
371        }
372    }
373    files.sort();
374    Ok(files)
375}
376
377fn load_workflow_file(path: &Path, source: &WorkflowLoadSource) -> anyhow::Result<WorkflowSpec> {
378    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
379    let parsed = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
380        .with_context(|| format!("parse workflow yaml {}", path.display()))?;
381    let workflow = parsed
382        .workflow
383        .ok_or_else(|| anyhow::anyhow!("missing `workflow` key"))?;
384    let workflow_id = workflow
385        .workflow_id
386        .or(workflow.id)
387        .or_else(|| {
388            path.file_stem()
389                .and_then(|v| v.to_str())
390                .map(ToString::to_string)
391        })
392        .ok_or_else(|| anyhow::anyhow!("workflow id missing"))?;
393    let name = workflow.name.clone().unwrap_or_else(|| workflow_id.clone());
394    let source_ref = source_ref(source, path);
395    let steps = workflow
396        .steps
397        .into_iter()
398        .enumerate()
399        .map(|(idx, step)| match step {
400            WorkflowStepInput::String(action) => WorkflowStepSpec {
401                step_id: format!("step_{}", idx + 1),
402                action,
403                with: None,
404                knowledge: KnowledgeBinding::default(),
405            },
406            WorkflowStepInput::Object(step) => WorkflowStepSpec {
407                step_id: step
408                    .step_id
409                    .or(step.id)
410                    .unwrap_or_else(|| format!("step_{}", idx + 1)),
411                action: step.action,
412                with: step.with,
413                knowledge: KnowledgeBinding::default(),
414            },
415        })
416        .collect::<Vec<_>>();
417    let mut hooks = parse_hooks_value(
418        workflow.hooks.as_ref().or(parsed.hooks.as_ref()),
419        &workflow_id,
420        &source_ref,
421    )?;
422    for hook in &mut hooks {
423        if hook.workflow_id.is_empty() {
424            hook.workflow_id = workflow_id.clone();
425        }
426    }
427    Ok(WorkflowSpec {
428        workflow_id,
429        name,
430        description: workflow.description,
431        enabled: workflow.enabled.unwrap_or(true),
432        knowledge: KnowledgeBinding::default(),
433        steps,
434        hooks,
435        source: Some(source_ref),
436    })
437}
438
439fn load_hook_file(
440    path: &Path,
441    source: &WorkflowLoadSource,
442) -> anyhow::Result<Vec<WorkflowHookBinding>> {
443    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
444    let env = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
445        .with_context(|| format!("parse hook yaml {}", path.display()))?;
446    parse_hooks_value(env.hooks.as_ref(), "", &source_ref(source, path))
447}
448
449fn parse_hooks_value(
450    hooks_value: Option<&Value>,
451    default_workflow_id: &str,
452    source_ref: &WorkflowSourceRef,
453) -> anyhow::Result<Vec<WorkflowHookBinding>> {
454    let Some(hooks_value) = hooks_value else {
455        return Ok(Vec::new());
456    };
457    let shape = serde_json::from_value::<HookFileShape>(hooks_value.clone())
458        .or_else(|_| serde_yaml::from_value::<HookFileShape>(serde_yaml::to_value(hooks_value)?))
459        .context("parse hooks")?;
460    let mut out = Vec::new();
461    match shape {
462        HookFileShape::Map(map) => {
463            for (event, actions) in map {
464                out.push(WorkflowHookBinding {
465                    binding_id: format!(
466                        "{}.{}",
467                        default_workflow_id_or_default(default_workflow_id),
468                        normalize_ident(&event)
469                    ),
470                    workflow_id: default_workflow_id.to_string(),
471                    event,
472                    enabled: true,
473                    actions: actions.into_iter().map(to_action_spec).collect(),
474                    source: Some(source_ref.clone()),
475                });
476            }
477        }
478        HookFileShape::List(items) => {
479            for item in items {
480                out.push(WorkflowHookBinding {
481                    binding_id: item.binding_id.or(item.id).unwrap_or_else(|| {
482                        format!(
483                            "{}.{}",
484                            item.workflow_id
485                                .clone()
486                                .or(item.workflow.clone())
487                                .unwrap_or_else(|| default_workflow_id_or_default(
488                                    default_workflow_id
489                                )),
490                            normalize_ident(&item.event)
491                        )
492                    }),
493                    workflow_id: item
494                        .workflow_id
495                        .or(item.workflow)
496                        .unwrap_or_else(|| default_workflow_id.to_string()),
497                    event: item.event,
498                    enabled: item.enabled.unwrap_or(true),
499                    actions: item.actions.into_iter().map(to_action_spec).collect(),
500                    source: Some(source_ref.clone()),
501                });
502            }
503        }
504    }
505    Ok(out)
506}
507
508fn default_workflow_id_or_default(workflow_id: &str) -> String {
509    if workflow_id.trim().is_empty() {
510        "workflow".to_string()
511    } else {
512        workflow_id.to_string()
513    }
514}
515
516fn to_action_spec(input: HookActionInput) -> WorkflowActionSpec {
517    match input {
518        HookActionInput::String(action) => WorkflowActionSpec { action, with: None },
519        HookActionInput::Object(spec) => spec,
520    }
521}
522
523fn normalize_ident(input: &str) -> String {
524    input
525        .trim()
526        .to_ascii_lowercase()
527        .replace([' ', '/', '.'], "_")
528}
529
530fn source_ref(source: &WorkflowLoadSource, path: &Path) -> WorkflowSourceRef {
531    WorkflowSourceRef {
532        kind: source.kind.clone(),
533        pack_id: source.pack_id.clone(),
534        path: Some(path.to_string_lossy().to_string()),
535    }
536}
537
538fn default_true() -> bool {
539    true
540}
541
542#[cfg(test)]
543mod tests {
544    use super::*;
545    use tempfile::tempdir;
546
547    #[test]
548    fn loads_workflow_with_embedded_hooks() {
549        let dir = tempdir().expect("dir");
550        let workflows_dir = dir.path().join("workflows");
551        fs::create_dir_all(&workflows_dir).expect("mkdir");
552        fs::write(
553            workflows_dir.join("demo.yaml"),
554            r#"
555workflow:
556  id: build_feature
557  name: Build Feature
558  steps:
559    - planner
560    - action: verifier.run
561      with:
562        strict: true
563  hooks:
564    task_created:
565      - kanban.update
566      - action: slack.notify
567        with:
568          channel: engineering
569"#,
570        )
571        .expect("write");
572        let registry = load_registry(&[WorkflowLoadSource {
573            root: dir.path().to_path_buf(),
574            kind: WorkflowSourceKind::Workspace,
575            pack_id: None,
576        }])
577        .expect("registry");
578        let workflow = registry.workflows.get("build_feature").expect("workflow");
579        assert_eq!(workflow.steps.len(), 2);
580        assert_eq!(registry.hooks.len(), 1);
581        assert_eq!(registry.hooks[0].actions.len(), 2);
582    }
583}