Skip to main content

tandem_workflows/
lib.rs

1use anyhow::Context;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8use tandem_orchestrator::KnowledgeBinding;
9
10mod mission_builder;
11pub mod plan_package;
12
13pub use mission_builder::{
14    validate_mission_blueprint, ApprovalDecision, HumanApprovalGate, InputRefBlueprint,
15    MissionBlueprint, MissionMilestoneBlueprint, MissionPhaseBlueprint, MissionPhaseExecutionMode,
16    MissionTeamBlueprint, OutputContractBlueprint, ReviewStage, ReviewStageKind, ValidationMessage,
17    ValidationSeverity, WorkstreamBlueprint,
18};
19pub use plan_package::{
20    AutomationV2Schedule, AutomationV2ScheduleType, WorkflowPlan, WorkflowPlanChatMessage,
21    WorkflowPlanConversation, WorkflowPlanDraftRecord, WorkflowPlanStep,
22};
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25#[serde(rename_all = "snake_case")]
26pub enum WorkflowSourceKind {
27    BuiltIn,
28    Pack,
29    Workspace,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
33pub struct WorkflowSourceRef {
34    pub kind: WorkflowSourceKind,
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub pack_id: Option<String>,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub path: Option<String>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub struct WorkflowActionSpec {
43    pub action: String,
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub with: Option<Value>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
49pub struct WorkflowStepSpec {
50    pub step_id: String,
51    pub action: String,
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub with: Option<Value>,
54    #[serde(default)]
55    pub knowledge: KnowledgeBinding,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59pub struct WorkflowHookBinding {
60    pub binding_id: String,
61    pub workflow_id: String,
62    pub event: String,
63    #[serde(default = "default_true")]
64    pub enabled: bool,
65    #[serde(default)]
66    pub actions: Vec<WorkflowActionSpec>,
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub source: Option<WorkflowSourceRef>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72pub struct WorkflowSpec {
73    pub workflow_id: String,
74    pub name: String,
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub description: Option<String>,
77    #[serde(default = "default_true")]
78    pub enabled: bool,
79    #[serde(default)]
80    pub knowledge: KnowledgeBinding,
81    #[serde(default)]
82    pub steps: Vec<WorkflowStepSpec>,
83    #[serde(default)]
84    pub hooks: Vec<WorkflowHookBinding>,
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub source: Option<WorkflowSourceRef>,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
90pub struct WorkflowRegistry {
91    #[serde(default)]
92    pub workflows: HashMap<String, WorkflowSpec>,
93    #[serde(default)]
94    pub hooks: Vec<WorkflowHookBinding>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
98#[serde(rename_all = "snake_case")]
99pub enum WorkflowRunStatus {
100    Queued,
101    Running,
102    Completed,
103    Failed,
104    DryRun,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
108#[serde(rename_all = "snake_case")]
109pub enum WorkflowActionRunStatus {
110    Pending,
111    Running,
112    Completed,
113    Failed,
114    Skipped,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct WorkflowActionRunRecord {
119    pub action_id: String,
120    pub action: String,
121    #[serde(default, skip_serializing_if = "Option::is_none")]
122    pub task_id: Option<String>,
123    pub status: WorkflowActionRunStatus,
124    #[serde(default, skip_serializing_if = "Option::is_none")]
125    pub detail: Option<String>,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub output: Option<Value>,
128    pub updated_at_ms: u64,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
132pub struct WorkflowRunRecord {
133    pub run_id: String,
134    pub workflow_id: String,
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub automation_id: Option<String>,
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub automation_run_id: Option<String>,
139    #[serde(default, skip_serializing_if = "Option::is_none")]
140    pub binding_id: Option<String>,
141    #[serde(default, skip_serializing_if = "Option::is_none")]
142    pub trigger_event: Option<String>,
143    #[serde(default, skip_serializing_if = "Option::is_none")]
144    pub source_event_id: Option<String>,
145    #[serde(default, skip_serializing_if = "Option::is_none")]
146    pub task_id: Option<String>,
147    pub status: WorkflowRunStatus,
148    pub created_at_ms: u64,
149    pub updated_at_ms: u64,
150    #[serde(default, skip_serializing_if = "Option::is_none")]
151    pub finished_at_ms: Option<u64>,
152    #[serde(default)]
153    pub actions: Vec<WorkflowActionRunRecord>,
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub source: Option<WorkflowSourceRef>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
159pub struct WorkflowSimulationResult {
160    #[serde(default)]
161    pub matched_bindings: Vec<WorkflowHookBinding>,
162    #[serde(default)]
163    pub planned_actions: Vec<WorkflowActionSpec>,
164    #[serde(default)]
165    pub canonical_events: Vec<String>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
169#[serde(rename_all = "snake_case")]
170pub enum WorkflowValidationSeverity {
171    Info,
172    Warning,
173    Error,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
177pub struct WorkflowValidationMessage {
178    pub severity: WorkflowValidationSeverity,
179    pub message: String,
180}
181
182#[derive(Debug, Clone)]
183pub struct WorkflowLoadSource {
184    pub root: PathBuf,
185    pub kind: WorkflowSourceKind,
186    pub pack_id: Option<String>,
187}
188
189#[derive(Debug, Deserialize)]
190struct WorkflowFileEnvelope {
191    #[serde(default)]
192    workflow: Option<WorkflowFileShape>,
193    #[serde(default)]
194    hooks: Option<Value>,
195}
196
197#[derive(Debug, Deserialize)]
198struct WorkflowFileShape {
199    #[serde(default)]
200    id: Option<String>,
201    #[serde(default)]
202    workflow_id: Option<String>,
203    #[serde(default)]
204    name: Option<String>,
205    #[serde(default)]
206    description: Option<String>,
207    #[serde(default)]
208    enabled: Option<bool>,
209    #[serde(default)]
210    steps: Vec<WorkflowStepInput>,
211    #[serde(default)]
212    hooks: Option<Value>,
213}
214
215#[derive(Debug, Deserialize)]
216#[serde(untagged)]
217enum WorkflowStepInput {
218    String(String),
219    Object(WorkflowStepObjectInput),
220}
221
222#[derive(Debug, Deserialize)]
223struct WorkflowStepObjectInput {
224    #[serde(default)]
225    id: Option<String>,
226    #[serde(default)]
227    step_id: Option<String>,
228    action: String,
229    #[serde(default)]
230    with: Option<Value>,
231}
232
233#[derive(Debug, Deserialize)]
234#[serde(untagged)]
235enum HookFileShape {
236    Map(HashMap<String, Vec<HookActionInput>>),
237    List(Vec<HookBindingInput>),
238}
239
240#[derive(Debug, Deserialize)]
241struct HookBindingInput {
242    #[serde(default)]
243    id: Option<String>,
244    #[serde(default)]
245    binding_id: Option<String>,
246    #[serde(default)]
247    workflow: Option<String>,
248    #[serde(default)]
249    workflow_id: Option<String>,
250    event: String,
251    #[serde(default)]
252    enabled: Option<bool>,
253    #[serde(default)]
254    actions: Vec<HookActionInput>,
255}
256
257#[derive(Debug, Clone, Deserialize)]
258#[serde(untagged)]
259enum HookActionInput {
260    String(String),
261    Object(WorkflowActionSpec),
262}
263
264pub fn load_registry(sources: &[WorkflowLoadSource]) -> anyhow::Result<WorkflowRegistry> {
265    let mut registry = WorkflowRegistry::default();
266    for source in sources {
267        load_source_into(&mut registry, source)?;
268    }
269    Ok(registry)
270}
271
272pub fn validate_registry(registry: &WorkflowRegistry) -> Vec<WorkflowValidationMessage> {
273    let mut messages = Vec::new();
274    for workflow in registry.workflows.values() {
275        if workflow.steps.is_empty()
276            && registry
277                .hooks
278                .iter()
279                .all(|hook| hook.workflow_id != workflow.workflow_id)
280        {
281            messages.push(WorkflowValidationMessage {
282                severity: WorkflowValidationSeverity::Warning,
283                message: format!(
284                    "workflow `{}` has no steps and no hook bindings",
285                    workflow.workflow_id
286                ),
287            });
288        }
289        for step in &workflow.steps {
290            if step.action.trim().is_empty() {
291                messages.push(WorkflowValidationMessage {
292                    severity: WorkflowValidationSeverity::Error,
293                    message: format!(
294                        "workflow `{}` has step `{}` with empty action",
295                        workflow.workflow_id, step.step_id
296                    ),
297                });
298            }
299        }
300    }
301    for hook in &registry.hooks {
302        if !registry.workflows.contains_key(&hook.workflow_id) {
303            messages.push(WorkflowValidationMessage {
304                severity: WorkflowValidationSeverity::Error,
305                message: format!(
306                    "hook `{}` references unknown workflow `{}`",
307                    hook.binding_id, hook.workflow_id
308                ),
309            });
310        }
311        if hook.actions.is_empty() {
312            messages.push(WorkflowValidationMessage {
313                severity: WorkflowValidationSeverity::Warning,
314                message: format!("hook `{}` has no actions", hook.binding_id),
315            });
316        }
317    }
318    messages
319}
320
321fn load_source_into(
322    registry: &mut WorkflowRegistry,
323    source: &WorkflowLoadSource,
324) -> anyhow::Result<()> {
325    for entry in collect_yaml_files(&source.root.join("workflows"))? {
326        let workflow = load_workflow_file(&entry, source)?;
327        registry
328            .workflows
329            .insert(workflow.workflow_id.clone(), workflow.clone());
330        registry.hooks.retain(|hook| hook.workflow_id != workflow.workflow_id || !matches!(hook.source.as_ref(), Some(src) if src.path.as_deref() == Some(&entry.to_string_lossy().to_string())));
331        registry.hooks.extend(workflow.hooks.clone());
332    }
333    for entry in collect_yaml_files(&source.root.join("hooks"))? {
334        registry.hooks.extend(load_hook_file(&entry, source)?);
335    }
336    Ok(())
337}
338
339fn collect_yaml_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
340    let mut files = Vec::new();
341    let entries = match fs::read_dir(dir) {
342        Ok(entries) => entries,
343        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(files),
344        Err(err) => return Err(err.into()),
345    };
346    for entry in entries {
347        let path = entry?.path();
348        if path.is_dir() {
349            files.extend(collect_yaml_files(&path)?);
350            continue;
351        }
352        let ext = path
353            .extension()
354            .and_then(|v| v.to_str())
355            .unwrap_or_default();
356        if matches!(ext, "yaml" | "yml") {
357            files.push(path);
358        }
359    }
360    files.sort();
361    Ok(files)
362}
363
364fn load_workflow_file(path: &Path, source: &WorkflowLoadSource) -> anyhow::Result<WorkflowSpec> {
365    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
366    let parsed = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
367        .with_context(|| format!("parse workflow yaml {}", path.display()))?;
368    let workflow = parsed
369        .workflow
370        .ok_or_else(|| anyhow::anyhow!("missing `workflow` key"))?;
371    let workflow_id = workflow
372        .workflow_id
373        .or(workflow.id)
374        .or_else(|| {
375            path.file_stem()
376                .and_then(|v| v.to_str())
377                .map(ToString::to_string)
378        })
379        .ok_or_else(|| anyhow::anyhow!("workflow id missing"))?;
380    let name = workflow.name.clone().unwrap_or_else(|| workflow_id.clone());
381    let source_ref = source_ref(source, path);
382    let steps = workflow
383        .steps
384        .into_iter()
385        .enumerate()
386        .map(|(idx, step)| match step {
387            WorkflowStepInput::String(action) => WorkflowStepSpec {
388                step_id: format!("step_{}", idx + 1),
389                action,
390                with: None,
391                knowledge: KnowledgeBinding::default(),
392            },
393            WorkflowStepInput::Object(step) => WorkflowStepSpec {
394                step_id: step
395                    .step_id
396                    .or(step.id)
397                    .unwrap_or_else(|| format!("step_{}", idx + 1)),
398                action: step.action,
399                with: step.with,
400                knowledge: KnowledgeBinding::default(),
401            },
402        })
403        .collect::<Vec<_>>();
404    let mut hooks = parse_hooks_value(
405        workflow.hooks.as_ref().or(parsed.hooks.as_ref()),
406        &workflow_id,
407        &source_ref,
408    )?;
409    for hook in &mut hooks {
410        if hook.workflow_id.is_empty() {
411            hook.workflow_id = workflow_id.clone();
412        }
413    }
414    Ok(WorkflowSpec {
415        workflow_id,
416        name,
417        description: workflow.description,
418        enabled: workflow.enabled.unwrap_or(true),
419        knowledge: KnowledgeBinding::default(),
420        steps,
421        hooks,
422        source: Some(source_ref),
423    })
424}
425
426fn load_hook_file(
427    path: &Path,
428    source: &WorkflowLoadSource,
429) -> anyhow::Result<Vec<WorkflowHookBinding>> {
430    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
431    let env = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
432        .with_context(|| format!("parse hook yaml {}", path.display()))?;
433    parse_hooks_value(env.hooks.as_ref(), "", &source_ref(source, path))
434}
435
436fn parse_hooks_value(
437    hooks_value: Option<&Value>,
438    default_workflow_id: &str,
439    source_ref: &WorkflowSourceRef,
440) -> anyhow::Result<Vec<WorkflowHookBinding>> {
441    let Some(hooks_value) = hooks_value else {
442        return Ok(Vec::new());
443    };
444    let shape = serde_json::from_value::<HookFileShape>(hooks_value.clone())
445        .or_else(|_| serde_yaml::from_value::<HookFileShape>(serde_yaml::to_value(hooks_value)?))
446        .context("parse hooks")?;
447    let mut out = Vec::new();
448    match shape {
449        HookFileShape::Map(map) => {
450            for (event, actions) in map {
451                out.push(WorkflowHookBinding {
452                    binding_id: format!(
453                        "{}.{}",
454                        default_workflow_id_or_default(default_workflow_id),
455                        normalize_ident(&event)
456                    ),
457                    workflow_id: default_workflow_id.to_string(),
458                    event,
459                    enabled: true,
460                    actions: actions.into_iter().map(to_action_spec).collect(),
461                    source: Some(source_ref.clone()),
462                });
463            }
464        }
465        HookFileShape::List(items) => {
466            for item in items {
467                out.push(WorkflowHookBinding {
468                    binding_id: item.binding_id.or(item.id).unwrap_or_else(|| {
469                        format!(
470                            "{}.{}",
471                            item.workflow_id
472                                .clone()
473                                .or(item.workflow.clone())
474                                .unwrap_or_else(|| default_workflow_id_or_default(
475                                    default_workflow_id
476                                )),
477                            normalize_ident(&item.event)
478                        )
479                    }),
480                    workflow_id: item
481                        .workflow_id
482                        .or(item.workflow)
483                        .unwrap_or_else(|| default_workflow_id.to_string()),
484                    event: item.event,
485                    enabled: item.enabled.unwrap_or(true),
486                    actions: item.actions.into_iter().map(to_action_spec).collect(),
487                    source: Some(source_ref.clone()),
488                });
489            }
490        }
491    }
492    Ok(out)
493}
494
495fn default_workflow_id_or_default(workflow_id: &str) -> String {
496    if workflow_id.trim().is_empty() {
497        "workflow".to_string()
498    } else {
499        workflow_id.to_string()
500    }
501}
502
503fn to_action_spec(input: HookActionInput) -> WorkflowActionSpec {
504    match input {
505        HookActionInput::String(action) => WorkflowActionSpec { action, with: None },
506        HookActionInput::Object(spec) => spec,
507    }
508}
509
510fn normalize_ident(input: &str) -> String {
511    input
512        .trim()
513        .to_ascii_lowercase()
514        .replace([' ', '/', '.'], "_")
515}
516
517fn source_ref(source: &WorkflowLoadSource, path: &Path) -> WorkflowSourceRef {
518    WorkflowSourceRef {
519        kind: source.kind.clone(),
520        pack_id: source.pack_id.clone(),
521        path: Some(path.to_string_lossy().to_string()),
522    }
523}
524
525fn default_true() -> bool {
526    true
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use tempfile::tempdir;
533
534    #[test]
535    fn loads_workflow_with_embedded_hooks() {
536        let dir = tempdir().expect("dir");
537        let workflows_dir = dir.path().join("workflows");
538        fs::create_dir_all(&workflows_dir).expect("mkdir");
539        fs::write(
540            workflows_dir.join("demo.yaml"),
541            r#"
542workflow:
543  id: build_feature
544  name: Build Feature
545  steps:
546    - planner
547    - action: verifier.run
548      with:
549        strict: true
550  hooks:
551    task_created:
552      - kanban.update
553      - action: slack.notify
554        with:
555          channel: engineering
556"#,
557        )
558        .expect("write");
559        let registry = load_registry(&[WorkflowLoadSource {
560            root: dir.path().to_path_buf(),
561            kind: WorkflowSourceKind::Workspace,
562            pack_id: None,
563        }])
564        .expect("registry");
565        let workflow = registry.workflows.get("build_feature").expect("workflow");
566        assert_eq!(workflow.steps.len(), 2);
567        assert_eq!(registry.hooks.len(), 1);
568        assert_eq!(registry.hooks[0].actions.len(), 2);
569    }
570}