Skip to main content

tandem_workflows/
lib.rs

1use anyhow::Context;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8mod mission_builder;
9
10pub use mission_builder::{
11    validate_mission_blueprint, ApprovalDecision, HumanApprovalGate, InputRefBlueprint,
12    MissionBlueprint, MissionMilestoneBlueprint, MissionPhaseBlueprint, MissionPhaseExecutionMode,
13    MissionTeamBlueprint, OutputContractBlueprint, ReviewStage, ReviewStageKind, ValidationMessage,
14    ValidationSeverity, WorkstreamBlueprint,
15};
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "snake_case")]
19pub enum WorkflowSourceKind {
20    BuiltIn,
21    Pack,
22    Workspace,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct WorkflowSourceRef {
27    pub kind: WorkflowSourceKind,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub pack_id: Option<String>,
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub path: Option<String>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
35pub struct WorkflowActionSpec {
36    pub action: String,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub with: Option<Value>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub struct WorkflowStepSpec {
43    pub step_id: String,
44    pub action: String,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub with: Option<Value>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub struct WorkflowHookBinding {
51    pub binding_id: String,
52    pub workflow_id: String,
53    pub event: String,
54    #[serde(default = "default_true")]
55    pub enabled: bool,
56    #[serde(default)]
57    pub actions: Vec<WorkflowActionSpec>,
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub source: Option<WorkflowSourceRef>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub struct WorkflowSpec {
64    pub workflow_id: String,
65    pub name: String,
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub description: Option<String>,
68    #[serde(default = "default_true")]
69    pub enabled: bool,
70    #[serde(default)]
71    pub steps: Vec<WorkflowStepSpec>,
72    #[serde(default)]
73    pub hooks: Vec<WorkflowHookBinding>,
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub source: Option<WorkflowSourceRef>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
79pub struct WorkflowRegistry {
80    #[serde(default)]
81    pub workflows: HashMap<String, WorkflowSpec>,
82    #[serde(default)]
83    pub hooks: Vec<WorkflowHookBinding>,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(rename_all = "snake_case")]
88pub enum WorkflowRunStatus {
89    Queued,
90    Running,
91    Completed,
92    Failed,
93    DryRun,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97#[serde(rename_all = "snake_case")]
98pub enum WorkflowActionRunStatus {
99    Pending,
100    Running,
101    Completed,
102    Failed,
103    Skipped,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
107pub struct WorkflowActionRunRecord {
108    pub action_id: String,
109    pub action: String,
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub task_id: Option<String>,
112    pub status: WorkflowActionRunStatus,
113    #[serde(default, skip_serializing_if = "Option::is_none")]
114    pub detail: Option<String>,
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub output: Option<Value>,
117    pub updated_at_ms: u64,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121pub struct WorkflowRunRecord {
122    pub run_id: String,
123    pub workflow_id: String,
124    #[serde(default, skip_serializing_if = "Option::is_none")]
125    pub binding_id: Option<String>,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub trigger_event: Option<String>,
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    pub source_event_id: Option<String>,
130    #[serde(default, skip_serializing_if = "Option::is_none")]
131    pub task_id: Option<String>,
132    pub status: WorkflowRunStatus,
133    pub created_at_ms: u64,
134    pub updated_at_ms: u64,
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub finished_at_ms: Option<u64>,
137    #[serde(default)]
138    pub actions: Vec<WorkflowActionRunRecord>,
139    #[serde(default, skip_serializing_if = "Option::is_none")]
140    pub source: Option<WorkflowSourceRef>,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144pub struct WorkflowSimulationResult {
145    #[serde(default)]
146    pub matched_bindings: Vec<WorkflowHookBinding>,
147    #[serde(default)]
148    pub planned_actions: Vec<WorkflowActionSpec>,
149    #[serde(default)]
150    pub canonical_events: Vec<String>,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
154#[serde(rename_all = "snake_case")]
155pub enum WorkflowValidationSeverity {
156    Info,
157    Warning,
158    Error,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
162pub struct WorkflowValidationMessage {
163    pub severity: WorkflowValidationSeverity,
164    pub message: String,
165}
166
167#[derive(Debug, Clone)]
168pub struct WorkflowLoadSource {
169    pub root: PathBuf,
170    pub kind: WorkflowSourceKind,
171    pub pack_id: Option<String>,
172}
173
174#[derive(Debug, Deserialize)]
175struct WorkflowFileEnvelope {
176    #[serde(default)]
177    workflow: Option<WorkflowFileShape>,
178    #[serde(default)]
179    hooks: Option<Value>,
180}
181
182#[derive(Debug, Deserialize)]
183struct WorkflowFileShape {
184    #[serde(default)]
185    id: Option<String>,
186    #[serde(default)]
187    workflow_id: Option<String>,
188    #[serde(default)]
189    name: Option<String>,
190    #[serde(default)]
191    description: Option<String>,
192    #[serde(default)]
193    enabled: Option<bool>,
194    #[serde(default)]
195    steps: Vec<WorkflowStepInput>,
196    #[serde(default)]
197    hooks: Option<Value>,
198}
199
200#[derive(Debug, Deserialize)]
201#[serde(untagged)]
202enum WorkflowStepInput {
203    String(String),
204    Object(WorkflowStepObjectInput),
205}
206
207#[derive(Debug, Deserialize)]
208struct WorkflowStepObjectInput {
209    #[serde(default)]
210    id: Option<String>,
211    #[serde(default)]
212    step_id: Option<String>,
213    action: String,
214    #[serde(default)]
215    with: Option<Value>,
216}
217
218#[derive(Debug, Deserialize)]
219#[serde(untagged)]
220enum HookFileShape {
221    Map(HashMap<String, Vec<HookActionInput>>),
222    List(Vec<HookBindingInput>),
223}
224
225#[derive(Debug, Deserialize)]
226struct HookBindingInput {
227    #[serde(default)]
228    id: Option<String>,
229    #[serde(default)]
230    binding_id: Option<String>,
231    #[serde(default)]
232    workflow: Option<String>,
233    #[serde(default)]
234    workflow_id: Option<String>,
235    event: String,
236    #[serde(default)]
237    enabled: Option<bool>,
238    #[serde(default)]
239    actions: Vec<HookActionInput>,
240}
241
242#[derive(Debug, Clone, Deserialize)]
243#[serde(untagged)]
244enum HookActionInput {
245    String(String),
246    Object(WorkflowActionSpec),
247}
248
249pub fn load_registry(sources: &[WorkflowLoadSource]) -> anyhow::Result<WorkflowRegistry> {
250    let mut registry = WorkflowRegistry::default();
251    for source in sources {
252        load_source_into(&mut registry, source)?;
253    }
254    Ok(registry)
255}
256
257pub fn validate_registry(registry: &WorkflowRegistry) -> Vec<WorkflowValidationMessage> {
258    let mut messages = Vec::new();
259    for workflow in registry.workflows.values() {
260        if workflow.steps.is_empty()
261            && registry
262                .hooks
263                .iter()
264                .all(|hook| hook.workflow_id != workflow.workflow_id)
265        {
266            messages.push(WorkflowValidationMessage {
267                severity: WorkflowValidationSeverity::Warning,
268                message: format!(
269                    "workflow `{}` has no steps and no hook bindings",
270                    workflow.workflow_id
271                ),
272            });
273        }
274        for step in &workflow.steps {
275            if step.action.trim().is_empty() {
276                messages.push(WorkflowValidationMessage {
277                    severity: WorkflowValidationSeverity::Error,
278                    message: format!(
279                        "workflow `{}` has step `{}` with empty action",
280                        workflow.workflow_id, step.step_id
281                    ),
282                });
283            }
284        }
285    }
286    for hook in &registry.hooks {
287        if !registry.workflows.contains_key(&hook.workflow_id) {
288            messages.push(WorkflowValidationMessage {
289                severity: WorkflowValidationSeverity::Error,
290                message: format!(
291                    "hook `{}` references unknown workflow `{}`",
292                    hook.binding_id, hook.workflow_id
293                ),
294            });
295        }
296        if hook.actions.is_empty() {
297            messages.push(WorkflowValidationMessage {
298                severity: WorkflowValidationSeverity::Warning,
299                message: format!("hook `{}` has no actions", hook.binding_id),
300            });
301        }
302    }
303    messages
304}
305
306fn load_source_into(
307    registry: &mut WorkflowRegistry,
308    source: &WorkflowLoadSource,
309) -> anyhow::Result<()> {
310    for entry in collect_yaml_files(&source.root.join("workflows"))? {
311        let workflow = load_workflow_file(&entry, source)?;
312        registry
313            .workflows
314            .insert(workflow.workflow_id.clone(), workflow.clone());
315        registry.hooks.retain(|hook| hook.workflow_id != workflow.workflow_id || !matches!(hook.source.as_ref(), Some(src) if src.path.as_deref() == Some(&entry.to_string_lossy().to_string())));
316        registry.hooks.extend(workflow.hooks.clone());
317    }
318    for entry in collect_yaml_files(&source.root.join("hooks"))? {
319        registry.hooks.extend(load_hook_file(&entry, source)?);
320    }
321    Ok(())
322}
323
324fn collect_yaml_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
325    let mut files = Vec::new();
326    let entries = match fs::read_dir(dir) {
327        Ok(entries) => entries,
328        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(files),
329        Err(err) => return Err(err.into()),
330    };
331    for entry in entries {
332        let path = entry?.path();
333        if path.is_dir() {
334            files.extend(collect_yaml_files(&path)?);
335            continue;
336        }
337        let ext = path
338            .extension()
339            .and_then(|v| v.to_str())
340            .unwrap_or_default();
341        if matches!(ext, "yaml" | "yml") {
342            files.push(path);
343        }
344    }
345    files.sort();
346    Ok(files)
347}
348
349fn load_workflow_file(path: &Path, source: &WorkflowLoadSource) -> anyhow::Result<WorkflowSpec> {
350    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
351    let parsed = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
352        .with_context(|| format!("parse workflow yaml {}", path.display()))?;
353    let workflow = parsed
354        .workflow
355        .ok_or_else(|| anyhow::anyhow!("missing `workflow` key"))?;
356    let workflow_id = workflow
357        .workflow_id
358        .or(workflow.id)
359        .or_else(|| {
360            path.file_stem()
361                .and_then(|v| v.to_str())
362                .map(ToString::to_string)
363        })
364        .ok_or_else(|| anyhow::anyhow!("workflow id missing"))?;
365    let name = workflow.name.clone().unwrap_or_else(|| workflow_id.clone());
366    let source_ref = source_ref(source, path);
367    let steps = workflow
368        .steps
369        .into_iter()
370        .enumerate()
371        .map(|(idx, step)| match step {
372            WorkflowStepInput::String(action) => WorkflowStepSpec {
373                step_id: format!("step_{}", idx + 1),
374                action,
375                with: None,
376            },
377            WorkflowStepInput::Object(step) => WorkflowStepSpec {
378                step_id: step
379                    .step_id
380                    .or(step.id)
381                    .unwrap_or_else(|| format!("step_{}", idx + 1)),
382                action: step.action,
383                with: step.with,
384            },
385        })
386        .collect::<Vec<_>>();
387    let mut hooks = parse_hooks_value(
388        workflow.hooks.as_ref().or(parsed.hooks.as_ref()),
389        &workflow_id,
390        &source_ref,
391    )?;
392    for hook in &mut hooks {
393        if hook.workflow_id.is_empty() {
394            hook.workflow_id = workflow_id.clone();
395        }
396    }
397    Ok(WorkflowSpec {
398        workflow_id,
399        name,
400        description: workflow.description,
401        enabled: workflow.enabled.unwrap_or(true),
402        steps,
403        hooks,
404        source: Some(source_ref),
405    })
406}
407
408fn load_hook_file(
409    path: &Path,
410    source: &WorkflowLoadSource,
411) -> anyhow::Result<Vec<WorkflowHookBinding>> {
412    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
413    let env = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
414        .with_context(|| format!("parse hook yaml {}", path.display()))?;
415    parse_hooks_value(env.hooks.as_ref(), "", &source_ref(source, path))
416}
417
418fn parse_hooks_value(
419    hooks_value: Option<&Value>,
420    default_workflow_id: &str,
421    source_ref: &WorkflowSourceRef,
422) -> anyhow::Result<Vec<WorkflowHookBinding>> {
423    let Some(hooks_value) = hooks_value else {
424        return Ok(Vec::new());
425    };
426    let shape = serde_json::from_value::<HookFileShape>(hooks_value.clone())
427        .or_else(|_| serde_yaml::from_value::<HookFileShape>(serde_yaml::to_value(hooks_value)?))
428        .context("parse hooks")?;
429    let mut out = Vec::new();
430    match shape {
431        HookFileShape::Map(map) => {
432            for (event, actions) in map {
433                out.push(WorkflowHookBinding {
434                    binding_id: format!(
435                        "{}.{}",
436                        default_workflow_id_or_default(default_workflow_id),
437                        normalize_ident(&event)
438                    ),
439                    workflow_id: default_workflow_id.to_string(),
440                    event,
441                    enabled: true,
442                    actions: actions.into_iter().map(to_action_spec).collect(),
443                    source: Some(source_ref.clone()),
444                });
445            }
446        }
447        HookFileShape::List(items) => {
448            for item in items {
449                out.push(WorkflowHookBinding {
450                    binding_id: item.binding_id.or(item.id).unwrap_or_else(|| {
451                        format!(
452                            "{}.{}",
453                            item.workflow_id
454                                .clone()
455                                .or(item.workflow.clone())
456                                .unwrap_or_else(|| default_workflow_id_or_default(
457                                    default_workflow_id
458                                )),
459                            normalize_ident(&item.event)
460                        )
461                    }),
462                    workflow_id: item
463                        .workflow_id
464                        .or(item.workflow)
465                        .unwrap_or_else(|| default_workflow_id.to_string()),
466                    event: item.event,
467                    enabled: item.enabled.unwrap_or(true),
468                    actions: item.actions.into_iter().map(to_action_spec).collect(),
469                    source: Some(source_ref.clone()),
470                });
471            }
472        }
473    }
474    Ok(out)
475}
476
477fn default_workflow_id_or_default(workflow_id: &str) -> String {
478    if workflow_id.trim().is_empty() {
479        "workflow".to_string()
480    } else {
481        workflow_id.to_string()
482    }
483}
484
485fn to_action_spec(input: HookActionInput) -> WorkflowActionSpec {
486    match input {
487        HookActionInput::String(action) => WorkflowActionSpec { action, with: None },
488        HookActionInput::Object(spec) => spec,
489    }
490}
491
492fn normalize_ident(input: &str) -> String {
493    input
494        .trim()
495        .to_ascii_lowercase()
496        .replace([' ', '/', '.'], "_")
497}
498
499fn source_ref(source: &WorkflowLoadSource, path: &Path) -> WorkflowSourceRef {
500    WorkflowSourceRef {
501        kind: source.kind.clone(),
502        pack_id: source.pack_id.clone(),
503        path: Some(path.to_string_lossy().to_string()),
504    }
505}
506
507fn default_true() -> bool {
508    true
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use tempfile::tempdir;
515
516    #[test]
517    fn loads_workflow_with_embedded_hooks() {
518        let dir = tempdir().expect("dir");
519        let workflows_dir = dir.path().join("workflows");
520        fs::create_dir_all(&workflows_dir).expect("mkdir");
521        fs::write(
522            workflows_dir.join("demo.yaml"),
523            r#"
524workflow:
525  id: build_feature
526  name: Build Feature
527  steps:
528    - planner
529    - action: verifier.run
530      with:
531        strict: true
532  hooks:
533    task_created:
534      - kanban.update
535      - action: slack.notify
536        with:
537          channel: engineering
538"#,
539        )
540        .expect("write");
541        let registry = load_registry(&[WorkflowLoadSource {
542            root: dir.path().to_path_buf(),
543            kind: WorkflowSourceKind::Workspace,
544            pack_id: None,
545        }])
546        .expect("registry");
547        let workflow = registry.workflows.get("build_feature").expect("workflow");
548        assert_eq!(workflow.steps.len(), 2);
549        assert_eq!(registry.hooks.len(), 1);
550        assert_eq!(registry.hooks[0].actions.len(), 2);
551    }
552}