Skip to main content

tandem_workflows/
lib.rs

1use anyhow::Context;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8mod mission_builder;
9
10pub use mission_builder::{
11    validate_mission_blueprint, ApprovalDecision, HumanApprovalGate, InputRefBlueprint,
12    MissionBlueprint, MissionMilestoneBlueprint, MissionPhaseBlueprint, MissionPhaseExecutionMode,
13    MissionTeamBlueprint, OutputContractBlueprint, ReviewStage, ReviewStageKind, ValidationMessage,
14    ValidationSeverity, WorkstreamBlueprint,
15};
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "snake_case")]
19pub enum WorkflowSourceKind {
20    BuiltIn,
21    Pack,
22    Workspace,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct WorkflowSourceRef {
27    pub kind: WorkflowSourceKind,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub pack_id: Option<String>,
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub path: Option<String>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
35pub struct WorkflowActionSpec {
36    pub action: String,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub with: Option<Value>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub struct WorkflowStepSpec {
43    pub step_id: String,
44    pub action: String,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub with: Option<Value>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub struct WorkflowHookBinding {
51    pub binding_id: String,
52    pub workflow_id: String,
53    pub event: String,
54    #[serde(default = "default_true")]
55    pub enabled: bool,
56    #[serde(default)]
57    pub actions: Vec<WorkflowActionSpec>,
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub source: Option<WorkflowSourceRef>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub struct WorkflowSpec {
64    pub workflow_id: String,
65    pub name: String,
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub description: Option<String>,
68    #[serde(default = "default_true")]
69    pub enabled: bool,
70    #[serde(default)]
71    pub steps: Vec<WorkflowStepSpec>,
72    #[serde(default)]
73    pub hooks: Vec<WorkflowHookBinding>,
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub source: Option<WorkflowSourceRef>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
79pub struct WorkflowRegistry {
80    #[serde(default)]
81    pub workflows: HashMap<String, WorkflowSpec>,
82    #[serde(default)]
83    pub hooks: Vec<WorkflowHookBinding>,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(rename_all = "snake_case")]
88pub enum WorkflowRunStatus {
89    Queued,
90    Running,
91    Completed,
92    Failed,
93    DryRun,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97#[serde(rename_all = "snake_case")]
98pub enum WorkflowActionRunStatus {
99    Pending,
100    Running,
101    Completed,
102    Failed,
103    Skipped,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
107pub struct WorkflowActionRunRecord {
108    pub action_id: String,
109    pub action: String,
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub task_id: Option<String>,
112    pub status: WorkflowActionRunStatus,
113    #[serde(default, skip_serializing_if = "Option::is_none")]
114    pub detail: Option<String>,
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub output: Option<Value>,
117    pub updated_at_ms: u64,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121pub struct WorkflowRunRecord {
122    pub run_id: String,
123    pub workflow_id: String,
124    #[serde(default, skip_serializing_if = "Option::is_none")]
125    pub automation_id: Option<String>,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub automation_run_id: Option<String>,
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    pub binding_id: Option<String>,
130    #[serde(default, skip_serializing_if = "Option::is_none")]
131    pub trigger_event: Option<String>,
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    pub source_event_id: Option<String>,
134    #[serde(default, skip_serializing_if = "Option::is_none")]
135    pub task_id: Option<String>,
136    pub status: WorkflowRunStatus,
137    pub created_at_ms: u64,
138    pub updated_at_ms: u64,
139    #[serde(default, skip_serializing_if = "Option::is_none")]
140    pub finished_at_ms: Option<u64>,
141    #[serde(default)]
142    pub actions: Vec<WorkflowActionRunRecord>,
143    #[serde(default, skip_serializing_if = "Option::is_none")]
144    pub source: Option<WorkflowSourceRef>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
148pub struct WorkflowSimulationResult {
149    #[serde(default)]
150    pub matched_bindings: Vec<WorkflowHookBinding>,
151    #[serde(default)]
152    pub planned_actions: Vec<WorkflowActionSpec>,
153    #[serde(default)]
154    pub canonical_events: Vec<String>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
158#[serde(rename_all = "snake_case")]
159pub enum WorkflowValidationSeverity {
160    Info,
161    Warning,
162    Error,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
166pub struct WorkflowValidationMessage {
167    pub severity: WorkflowValidationSeverity,
168    pub message: String,
169}
170
171#[derive(Debug, Clone)]
172pub struct WorkflowLoadSource {
173    pub root: PathBuf,
174    pub kind: WorkflowSourceKind,
175    pub pack_id: Option<String>,
176}
177
178#[derive(Debug, Deserialize)]
179struct WorkflowFileEnvelope {
180    #[serde(default)]
181    workflow: Option<WorkflowFileShape>,
182    #[serde(default)]
183    hooks: Option<Value>,
184}
185
186#[derive(Debug, Deserialize)]
187struct WorkflowFileShape {
188    #[serde(default)]
189    id: Option<String>,
190    #[serde(default)]
191    workflow_id: Option<String>,
192    #[serde(default)]
193    name: Option<String>,
194    #[serde(default)]
195    description: Option<String>,
196    #[serde(default)]
197    enabled: Option<bool>,
198    #[serde(default)]
199    steps: Vec<WorkflowStepInput>,
200    #[serde(default)]
201    hooks: Option<Value>,
202}
203
204#[derive(Debug, Deserialize)]
205#[serde(untagged)]
206enum WorkflowStepInput {
207    String(String),
208    Object(WorkflowStepObjectInput),
209}
210
211#[derive(Debug, Deserialize)]
212struct WorkflowStepObjectInput {
213    #[serde(default)]
214    id: Option<String>,
215    #[serde(default)]
216    step_id: Option<String>,
217    action: String,
218    #[serde(default)]
219    with: Option<Value>,
220}
221
222#[derive(Debug, Deserialize)]
223#[serde(untagged)]
224enum HookFileShape {
225    Map(HashMap<String, Vec<HookActionInput>>),
226    List(Vec<HookBindingInput>),
227}
228
229#[derive(Debug, Deserialize)]
230struct HookBindingInput {
231    #[serde(default)]
232    id: Option<String>,
233    #[serde(default)]
234    binding_id: Option<String>,
235    #[serde(default)]
236    workflow: Option<String>,
237    #[serde(default)]
238    workflow_id: Option<String>,
239    event: String,
240    #[serde(default)]
241    enabled: Option<bool>,
242    #[serde(default)]
243    actions: Vec<HookActionInput>,
244}
245
246#[derive(Debug, Clone, Deserialize)]
247#[serde(untagged)]
248enum HookActionInput {
249    String(String),
250    Object(WorkflowActionSpec),
251}
252
253pub fn load_registry(sources: &[WorkflowLoadSource]) -> anyhow::Result<WorkflowRegistry> {
254    let mut registry = WorkflowRegistry::default();
255    for source in sources {
256        load_source_into(&mut registry, source)?;
257    }
258    Ok(registry)
259}
260
261pub fn validate_registry(registry: &WorkflowRegistry) -> Vec<WorkflowValidationMessage> {
262    let mut messages = Vec::new();
263    for workflow in registry.workflows.values() {
264        if workflow.steps.is_empty()
265            && registry
266                .hooks
267                .iter()
268                .all(|hook| hook.workflow_id != workflow.workflow_id)
269        {
270            messages.push(WorkflowValidationMessage {
271                severity: WorkflowValidationSeverity::Warning,
272                message: format!(
273                    "workflow `{}` has no steps and no hook bindings",
274                    workflow.workflow_id
275                ),
276            });
277        }
278        for step in &workflow.steps {
279            if step.action.trim().is_empty() {
280                messages.push(WorkflowValidationMessage {
281                    severity: WorkflowValidationSeverity::Error,
282                    message: format!(
283                        "workflow `{}` has step `{}` with empty action",
284                        workflow.workflow_id, step.step_id
285                    ),
286                });
287            }
288        }
289    }
290    for hook in &registry.hooks {
291        if !registry.workflows.contains_key(&hook.workflow_id) {
292            messages.push(WorkflowValidationMessage {
293                severity: WorkflowValidationSeverity::Error,
294                message: format!(
295                    "hook `{}` references unknown workflow `{}`",
296                    hook.binding_id, hook.workflow_id
297                ),
298            });
299        }
300        if hook.actions.is_empty() {
301            messages.push(WorkflowValidationMessage {
302                severity: WorkflowValidationSeverity::Warning,
303                message: format!("hook `{}` has no actions", hook.binding_id),
304            });
305        }
306    }
307    messages
308}
309
310fn load_source_into(
311    registry: &mut WorkflowRegistry,
312    source: &WorkflowLoadSource,
313) -> anyhow::Result<()> {
314    for entry in collect_yaml_files(&source.root.join("workflows"))? {
315        let workflow = load_workflow_file(&entry, source)?;
316        registry
317            .workflows
318            .insert(workflow.workflow_id.clone(), workflow.clone());
319        registry.hooks.retain(|hook| hook.workflow_id != workflow.workflow_id || !matches!(hook.source.as_ref(), Some(src) if src.path.as_deref() == Some(&entry.to_string_lossy().to_string())));
320        registry.hooks.extend(workflow.hooks.clone());
321    }
322    for entry in collect_yaml_files(&source.root.join("hooks"))? {
323        registry.hooks.extend(load_hook_file(&entry, source)?);
324    }
325    Ok(())
326}
327
328fn collect_yaml_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
329    let mut files = Vec::new();
330    let entries = match fs::read_dir(dir) {
331        Ok(entries) => entries,
332        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(files),
333        Err(err) => return Err(err.into()),
334    };
335    for entry in entries {
336        let path = entry?.path();
337        if path.is_dir() {
338            files.extend(collect_yaml_files(&path)?);
339            continue;
340        }
341        let ext = path
342            .extension()
343            .and_then(|v| v.to_str())
344            .unwrap_or_default();
345        if matches!(ext, "yaml" | "yml") {
346            files.push(path);
347        }
348    }
349    files.sort();
350    Ok(files)
351}
352
353fn load_workflow_file(path: &Path, source: &WorkflowLoadSource) -> anyhow::Result<WorkflowSpec> {
354    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
355    let parsed = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
356        .with_context(|| format!("parse workflow yaml {}", path.display()))?;
357    let workflow = parsed
358        .workflow
359        .ok_or_else(|| anyhow::anyhow!("missing `workflow` key"))?;
360    let workflow_id = workflow
361        .workflow_id
362        .or(workflow.id)
363        .or_else(|| {
364            path.file_stem()
365                .and_then(|v| v.to_str())
366                .map(ToString::to_string)
367        })
368        .ok_or_else(|| anyhow::anyhow!("workflow id missing"))?;
369    let name = workflow.name.clone().unwrap_or_else(|| workflow_id.clone());
370    let source_ref = source_ref(source, path);
371    let steps = workflow
372        .steps
373        .into_iter()
374        .enumerate()
375        .map(|(idx, step)| match step {
376            WorkflowStepInput::String(action) => WorkflowStepSpec {
377                step_id: format!("step_{}", idx + 1),
378                action,
379                with: None,
380            },
381            WorkflowStepInput::Object(step) => WorkflowStepSpec {
382                step_id: step
383                    .step_id
384                    .or(step.id)
385                    .unwrap_or_else(|| format!("step_{}", idx + 1)),
386                action: step.action,
387                with: step.with,
388            },
389        })
390        .collect::<Vec<_>>();
391    let mut hooks = parse_hooks_value(
392        workflow.hooks.as_ref().or(parsed.hooks.as_ref()),
393        &workflow_id,
394        &source_ref,
395    )?;
396    for hook in &mut hooks {
397        if hook.workflow_id.is_empty() {
398            hook.workflow_id = workflow_id.clone();
399        }
400    }
401    Ok(WorkflowSpec {
402        workflow_id,
403        name,
404        description: workflow.description,
405        enabled: workflow.enabled.unwrap_or(true),
406        steps,
407        hooks,
408        source: Some(source_ref),
409    })
410}
411
412fn load_hook_file(
413    path: &Path,
414    source: &WorkflowLoadSource,
415) -> anyhow::Result<Vec<WorkflowHookBinding>> {
416    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
417    let env = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
418        .with_context(|| format!("parse hook yaml {}", path.display()))?;
419    parse_hooks_value(env.hooks.as_ref(), "", &source_ref(source, path))
420}
421
422fn parse_hooks_value(
423    hooks_value: Option<&Value>,
424    default_workflow_id: &str,
425    source_ref: &WorkflowSourceRef,
426) -> anyhow::Result<Vec<WorkflowHookBinding>> {
427    let Some(hooks_value) = hooks_value else {
428        return Ok(Vec::new());
429    };
430    let shape = serde_json::from_value::<HookFileShape>(hooks_value.clone())
431        .or_else(|_| serde_yaml::from_value::<HookFileShape>(serde_yaml::to_value(hooks_value)?))
432        .context("parse hooks")?;
433    let mut out = Vec::new();
434    match shape {
435        HookFileShape::Map(map) => {
436            for (event, actions) in map {
437                out.push(WorkflowHookBinding {
438                    binding_id: format!(
439                        "{}.{}",
440                        default_workflow_id_or_default(default_workflow_id),
441                        normalize_ident(&event)
442                    ),
443                    workflow_id: default_workflow_id.to_string(),
444                    event,
445                    enabled: true,
446                    actions: actions.into_iter().map(to_action_spec).collect(),
447                    source: Some(source_ref.clone()),
448                });
449            }
450        }
451        HookFileShape::List(items) => {
452            for item in items {
453                out.push(WorkflowHookBinding {
454                    binding_id: item.binding_id.or(item.id).unwrap_or_else(|| {
455                        format!(
456                            "{}.{}",
457                            item.workflow_id
458                                .clone()
459                                .or(item.workflow.clone())
460                                .unwrap_or_else(|| default_workflow_id_or_default(
461                                    default_workflow_id
462                                )),
463                            normalize_ident(&item.event)
464                        )
465                    }),
466                    workflow_id: item
467                        .workflow_id
468                        .or(item.workflow)
469                        .unwrap_or_else(|| default_workflow_id.to_string()),
470                    event: item.event,
471                    enabled: item.enabled.unwrap_or(true),
472                    actions: item.actions.into_iter().map(to_action_spec).collect(),
473                    source: Some(source_ref.clone()),
474                });
475            }
476        }
477    }
478    Ok(out)
479}
480
481fn default_workflow_id_or_default(workflow_id: &str) -> String {
482    if workflow_id.trim().is_empty() {
483        "workflow".to_string()
484    } else {
485        workflow_id.to_string()
486    }
487}
488
489fn to_action_spec(input: HookActionInput) -> WorkflowActionSpec {
490    match input {
491        HookActionInput::String(action) => WorkflowActionSpec { action, with: None },
492        HookActionInput::Object(spec) => spec,
493    }
494}
495
496fn normalize_ident(input: &str) -> String {
497    input
498        .trim()
499        .to_ascii_lowercase()
500        .replace([' ', '/', '.'], "_")
501}
502
503fn source_ref(source: &WorkflowLoadSource, path: &Path) -> WorkflowSourceRef {
504    WorkflowSourceRef {
505        kind: source.kind.clone(),
506        pack_id: source.pack_id.clone(),
507        path: Some(path.to_string_lossy().to_string()),
508    }
509}
510
511fn default_true() -> bool {
512    true
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use tempfile::tempdir;
519
520    #[test]
521    fn loads_workflow_with_embedded_hooks() {
522        let dir = tempdir().expect("dir");
523        let workflows_dir = dir.path().join("workflows");
524        fs::create_dir_all(&workflows_dir).expect("mkdir");
525        fs::write(
526            workflows_dir.join("demo.yaml"),
527            r#"
528workflow:
529  id: build_feature
530  name: Build Feature
531  steps:
532    - planner
533    - action: verifier.run
534      with:
535        strict: true
536  hooks:
537    task_created:
538      - kanban.update
539      - action: slack.notify
540        with:
541          channel: engineering
542"#,
543        )
544        .expect("write");
545        let registry = load_registry(&[WorkflowLoadSource {
546            root: dir.path().to_path_buf(),
547            kind: WorkflowSourceKind::Workspace,
548            pack_id: None,
549        }])
550        .expect("registry");
551        let workflow = registry.workflows.get("build_feature").expect("workflow");
552        assert_eq!(workflow.steps.len(), 2);
553        assert_eq!(registry.hooks.len(), 1);
554        assert_eq!(registry.hooks[0].actions.len(), 2);
555    }
556}