Skip to main content

tandem_workflows/
lib.rs

1use anyhow::Context;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
9#[serde(rename_all = "snake_case")]
10pub enum WorkflowSourceKind {
11    BuiltIn,
12    Pack,
13    Workspace,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub struct WorkflowSourceRef {
18    pub kind: WorkflowSourceKind,
19    #[serde(default, skip_serializing_if = "Option::is_none")]
20    pub pack_id: Option<String>,
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    pub path: Option<String>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26pub struct WorkflowActionSpec {
27    pub action: String,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub with: Option<Value>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
33pub struct WorkflowStepSpec {
34    pub step_id: String,
35    pub action: String,
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub with: Option<Value>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
41pub struct WorkflowHookBinding {
42    pub binding_id: String,
43    pub workflow_id: String,
44    pub event: String,
45    #[serde(default = "default_true")]
46    pub enabled: bool,
47    #[serde(default)]
48    pub actions: Vec<WorkflowActionSpec>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub source: Option<WorkflowSourceRef>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54pub struct WorkflowSpec {
55    pub workflow_id: String,
56    pub name: String,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub description: Option<String>,
59    #[serde(default = "default_true")]
60    pub enabled: bool,
61    #[serde(default)]
62    pub steps: Vec<WorkflowStepSpec>,
63    #[serde(default)]
64    pub hooks: Vec<WorkflowHookBinding>,
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub source: Option<WorkflowSourceRef>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
70pub struct WorkflowRegistry {
71    #[serde(default)]
72    pub workflows: HashMap<String, WorkflowSpec>,
73    #[serde(default)]
74    pub hooks: Vec<WorkflowHookBinding>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
78#[serde(rename_all = "snake_case")]
79pub enum WorkflowRunStatus {
80    Queued,
81    Running,
82    Completed,
83    Failed,
84    DryRun,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(rename_all = "snake_case")]
89pub enum WorkflowActionRunStatus {
90    Pending,
91    Running,
92    Completed,
93    Failed,
94    Skipped,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98pub struct WorkflowActionRunRecord {
99    pub action_id: String,
100    pub action: String,
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub task_id: Option<String>,
103    pub status: WorkflowActionRunStatus,
104    #[serde(default, skip_serializing_if = "Option::is_none")]
105    pub detail: Option<String>,
106    #[serde(default, skip_serializing_if = "Option::is_none")]
107    pub output: Option<Value>,
108    pub updated_at_ms: u64,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
112pub struct WorkflowRunRecord {
113    pub run_id: String,
114    pub workflow_id: String,
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub binding_id: Option<String>,
117    #[serde(default, skip_serializing_if = "Option::is_none")]
118    pub trigger_event: Option<String>,
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    pub source_event_id: Option<String>,
121    #[serde(default, skip_serializing_if = "Option::is_none")]
122    pub task_id: Option<String>,
123    pub status: WorkflowRunStatus,
124    pub created_at_ms: u64,
125    pub updated_at_ms: u64,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub finished_at_ms: Option<u64>,
128    #[serde(default)]
129    pub actions: Vec<WorkflowActionRunRecord>,
130    #[serde(default, skip_serializing_if = "Option::is_none")]
131    pub source: Option<WorkflowSourceRef>,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub struct WorkflowSimulationResult {
136    #[serde(default)]
137    pub matched_bindings: Vec<WorkflowHookBinding>,
138    #[serde(default)]
139    pub planned_actions: Vec<WorkflowActionSpec>,
140    #[serde(default)]
141    pub canonical_events: Vec<String>,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
145#[serde(rename_all = "snake_case")]
146pub enum WorkflowValidationSeverity {
147    Info,
148    Warning,
149    Error,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
153pub struct WorkflowValidationMessage {
154    pub severity: WorkflowValidationSeverity,
155    pub message: String,
156}
157
158#[derive(Debug, Clone)]
159pub struct WorkflowLoadSource {
160    pub root: PathBuf,
161    pub kind: WorkflowSourceKind,
162    pub pack_id: Option<String>,
163}
164
165#[derive(Debug, Deserialize)]
166struct WorkflowFileEnvelope {
167    #[serde(default)]
168    workflow: Option<WorkflowFileShape>,
169    #[serde(default)]
170    hooks: Option<Value>,
171}
172
173#[derive(Debug, Deserialize)]
174struct WorkflowFileShape {
175    #[serde(default)]
176    id: Option<String>,
177    #[serde(default)]
178    workflow_id: Option<String>,
179    #[serde(default)]
180    name: Option<String>,
181    #[serde(default)]
182    description: Option<String>,
183    #[serde(default)]
184    enabled: Option<bool>,
185    #[serde(default)]
186    steps: Vec<WorkflowStepInput>,
187    #[serde(default)]
188    hooks: Option<Value>,
189}
190
191#[derive(Debug, Deserialize)]
192#[serde(untagged)]
193enum WorkflowStepInput {
194    String(String),
195    Object(WorkflowStepObjectInput),
196}
197
198#[derive(Debug, Deserialize)]
199struct WorkflowStepObjectInput {
200    #[serde(default)]
201    id: Option<String>,
202    #[serde(default)]
203    step_id: Option<String>,
204    action: String,
205    #[serde(default)]
206    with: Option<Value>,
207}
208
209#[derive(Debug, Deserialize)]
210#[serde(untagged)]
211enum HookFileShape {
212    Map(HashMap<String, Vec<HookActionInput>>),
213    List(Vec<HookBindingInput>),
214}
215
216#[derive(Debug, Deserialize)]
217struct HookBindingInput {
218    #[serde(default)]
219    id: Option<String>,
220    #[serde(default)]
221    binding_id: Option<String>,
222    #[serde(default)]
223    workflow: Option<String>,
224    #[serde(default)]
225    workflow_id: Option<String>,
226    event: String,
227    #[serde(default)]
228    enabled: Option<bool>,
229    #[serde(default)]
230    actions: Vec<HookActionInput>,
231}
232
233#[derive(Debug, Clone, Deserialize)]
234#[serde(untagged)]
235enum HookActionInput {
236    String(String),
237    Object(WorkflowActionSpec),
238}
239
240pub fn load_registry(sources: &[WorkflowLoadSource]) -> anyhow::Result<WorkflowRegistry> {
241    let mut registry = WorkflowRegistry::default();
242    for source in sources {
243        load_source_into(&mut registry, source)?;
244    }
245    Ok(registry)
246}
247
248pub fn validate_registry(registry: &WorkflowRegistry) -> Vec<WorkflowValidationMessage> {
249    let mut messages = Vec::new();
250    for workflow in registry.workflows.values() {
251        if workflow.steps.is_empty()
252            && registry
253                .hooks
254                .iter()
255                .all(|hook| hook.workflow_id != workflow.workflow_id)
256        {
257            messages.push(WorkflowValidationMessage {
258                severity: WorkflowValidationSeverity::Warning,
259                message: format!(
260                    "workflow `{}` has no steps and no hook bindings",
261                    workflow.workflow_id
262                ),
263            });
264        }
265        for step in &workflow.steps {
266            if step.action.trim().is_empty() {
267                messages.push(WorkflowValidationMessage {
268                    severity: WorkflowValidationSeverity::Error,
269                    message: format!(
270                        "workflow `{}` has step `{}` with empty action",
271                        workflow.workflow_id, step.step_id
272                    ),
273                });
274            }
275        }
276    }
277    for hook in &registry.hooks {
278        if !registry.workflows.contains_key(&hook.workflow_id) {
279            messages.push(WorkflowValidationMessage {
280                severity: WorkflowValidationSeverity::Error,
281                message: format!(
282                    "hook `{}` references unknown workflow `{}`",
283                    hook.binding_id, hook.workflow_id
284                ),
285            });
286        }
287        if hook.actions.is_empty() {
288            messages.push(WorkflowValidationMessage {
289                severity: WorkflowValidationSeverity::Warning,
290                message: format!("hook `{}` has no actions", hook.binding_id),
291            });
292        }
293    }
294    messages
295}
296
297fn load_source_into(
298    registry: &mut WorkflowRegistry,
299    source: &WorkflowLoadSource,
300) -> anyhow::Result<()> {
301    for entry in collect_yaml_files(&source.root.join("workflows"))? {
302        let workflow = load_workflow_file(&entry, source)?;
303        registry
304            .workflows
305            .insert(workflow.workflow_id.clone(), workflow.clone());
306        registry.hooks.retain(|hook| hook.workflow_id != workflow.workflow_id || !matches!(hook.source.as_ref(), Some(src) if src.path.as_deref() == Some(&entry.to_string_lossy().to_string())));
307        registry.hooks.extend(workflow.hooks.clone());
308    }
309    for entry in collect_yaml_files(&source.root.join("hooks"))? {
310        registry.hooks.extend(load_hook_file(&entry, source)?);
311    }
312    Ok(())
313}
314
315fn collect_yaml_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
316    let mut files = Vec::new();
317    let entries = match fs::read_dir(dir) {
318        Ok(entries) => entries,
319        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(files),
320        Err(err) => return Err(err.into()),
321    };
322    for entry in entries {
323        let path = entry?.path();
324        if path.is_dir() {
325            files.extend(collect_yaml_files(&path)?);
326            continue;
327        }
328        let ext = path
329            .extension()
330            .and_then(|v| v.to_str())
331            .unwrap_or_default();
332        if matches!(ext, "yaml" | "yml") {
333            files.push(path);
334        }
335    }
336    files.sort();
337    Ok(files)
338}
339
340fn load_workflow_file(path: &Path, source: &WorkflowLoadSource) -> anyhow::Result<WorkflowSpec> {
341    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
342    let parsed = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
343        .with_context(|| format!("parse workflow yaml {}", path.display()))?;
344    let workflow = parsed
345        .workflow
346        .ok_or_else(|| anyhow::anyhow!("missing `workflow` key"))?;
347    let workflow_id = workflow
348        .workflow_id
349        .or(workflow.id)
350        .or_else(|| {
351            path.file_stem()
352                .and_then(|v| v.to_str())
353                .map(ToString::to_string)
354        })
355        .ok_or_else(|| anyhow::anyhow!("workflow id missing"))?;
356    let name = workflow.name.clone().unwrap_or_else(|| workflow_id.clone());
357    let source_ref = source_ref(source, path);
358    let steps = workflow
359        .steps
360        .into_iter()
361        .enumerate()
362        .map(|(idx, step)| match step {
363            WorkflowStepInput::String(action) => WorkflowStepSpec {
364                step_id: format!("step_{}", idx + 1),
365                action,
366                with: None,
367            },
368            WorkflowStepInput::Object(step) => WorkflowStepSpec {
369                step_id: step
370                    .step_id
371                    .or(step.id)
372                    .unwrap_or_else(|| format!("step_{}", idx + 1)),
373                action: step.action,
374                with: step.with,
375            },
376        })
377        .collect::<Vec<_>>();
378    let mut hooks = parse_hooks_value(
379        workflow.hooks.as_ref().or(parsed.hooks.as_ref()),
380        &workflow_id,
381        &source_ref,
382    )?;
383    for hook in &mut hooks {
384        if hook.workflow_id.is_empty() {
385            hook.workflow_id = workflow_id.clone();
386        }
387    }
388    Ok(WorkflowSpec {
389        workflow_id,
390        name,
391        description: workflow.description,
392        enabled: workflow.enabled.unwrap_or(true),
393        steps,
394        hooks,
395        source: Some(source_ref),
396    })
397}
398
399fn load_hook_file(
400    path: &Path,
401    source: &WorkflowLoadSource,
402) -> anyhow::Result<Vec<WorkflowHookBinding>> {
403    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
404    let env = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
405        .with_context(|| format!("parse hook yaml {}", path.display()))?;
406    parse_hooks_value(env.hooks.as_ref(), "", &source_ref(source, path))
407}
408
409fn parse_hooks_value(
410    hooks_value: Option<&Value>,
411    default_workflow_id: &str,
412    source_ref: &WorkflowSourceRef,
413) -> anyhow::Result<Vec<WorkflowHookBinding>> {
414    let Some(hooks_value) = hooks_value else {
415        return Ok(Vec::new());
416    };
417    let shape = serde_json::from_value::<HookFileShape>(hooks_value.clone())
418        .or_else(|_| serde_yaml::from_value::<HookFileShape>(serde_yaml::to_value(hooks_value)?))
419        .context("parse hooks")?;
420    let mut out = Vec::new();
421    match shape {
422        HookFileShape::Map(map) => {
423            for (event, actions) in map {
424                out.push(WorkflowHookBinding {
425                    binding_id: format!(
426                        "{}.{}",
427                        default_workflow_id_or_default(default_workflow_id),
428                        normalize_ident(&event)
429                    ),
430                    workflow_id: default_workflow_id.to_string(),
431                    event,
432                    enabled: true,
433                    actions: actions.into_iter().map(to_action_spec).collect(),
434                    source: Some(source_ref.clone()),
435                });
436            }
437        }
438        HookFileShape::List(items) => {
439            for item in items {
440                out.push(WorkflowHookBinding {
441                    binding_id: item.binding_id.or(item.id).unwrap_or_else(|| {
442                        format!(
443                            "{}.{}",
444                            item.workflow_id
445                                .clone()
446                                .or(item.workflow.clone())
447                                .unwrap_or_else(|| default_workflow_id_or_default(
448                                    default_workflow_id
449                                )),
450                            normalize_ident(&item.event)
451                        )
452                    }),
453                    workflow_id: item
454                        .workflow_id
455                        .or(item.workflow)
456                        .unwrap_or_else(|| default_workflow_id.to_string()),
457                    event: item.event,
458                    enabled: item.enabled.unwrap_or(true),
459                    actions: item.actions.into_iter().map(to_action_spec).collect(),
460                    source: Some(source_ref.clone()),
461                });
462            }
463        }
464    }
465    Ok(out)
466}
467
468fn default_workflow_id_or_default(workflow_id: &str) -> String {
469    if workflow_id.trim().is_empty() {
470        "workflow".to_string()
471    } else {
472        workflow_id.to_string()
473    }
474}
475
476fn to_action_spec(input: HookActionInput) -> WorkflowActionSpec {
477    match input {
478        HookActionInput::String(action) => WorkflowActionSpec { action, with: None },
479        HookActionInput::Object(spec) => spec,
480    }
481}
482
483fn normalize_ident(input: &str) -> String {
484    input
485        .trim()
486        .to_ascii_lowercase()
487        .replace([' ', '/', '.'], "_")
488}
489
490fn source_ref(source: &WorkflowLoadSource, path: &Path) -> WorkflowSourceRef {
491    WorkflowSourceRef {
492        kind: source.kind.clone(),
493        pack_id: source.pack_id.clone(),
494        path: Some(path.to_string_lossy().to_string()),
495    }
496}
497
498fn default_true() -> bool {
499    true
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use tempfile::tempdir;
506
507    #[test]
508    fn loads_workflow_with_embedded_hooks() {
509        let dir = tempdir().expect("dir");
510        let workflows_dir = dir.path().join("workflows");
511        fs::create_dir_all(&workflows_dir).expect("mkdir");
512        fs::write(
513            workflows_dir.join("demo.yaml"),
514            r#"
515workflow:
516  id: build_feature
517  name: Build Feature
518  steps:
519    - planner
520    - action: verifier.run
521      with:
522        strict: true
523  hooks:
524    task_created:
525      - kanban.update
526      - action: slack.notify
527        with:
528          channel: engineering
529"#,
530        )
531        .expect("write");
532        let registry = load_registry(&[WorkflowLoadSource {
533            root: dir.path().to_path_buf(),
534            kind: WorkflowSourceKind::Workspace,
535            pack_id: None,
536        }])
537        .expect("registry");
538        let workflow = registry.workflows.get("build_feature").expect("workflow");
539        assert_eq!(workflow.steps.len(), 2);
540        assert_eq!(registry.hooks.len(), 1);
541        assert_eq!(registry.hooks[0].actions.len(), 2);
542    }
543}