Skip to main content

tandem_workflows/
lib.rs

1use anyhow::Context;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8mod mission_builder;
9pub mod plan_package;
10
11pub use mission_builder::{
12    validate_mission_blueprint, ApprovalDecision, HumanApprovalGate, InputRefBlueprint,
13    MissionBlueprint, MissionMilestoneBlueprint, MissionPhaseBlueprint, MissionPhaseExecutionMode,
14    MissionTeamBlueprint, OutputContractBlueprint, ReviewStage, ReviewStageKind, ValidationMessage,
15    ValidationSeverity, WorkstreamBlueprint,
16};
17pub use plan_package::{
18    AutomationV2Schedule, AutomationV2ScheduleType, WorkflowPlan, WorkflowPlanChatMessage,
19    WorkflowPlanConversation, WorkflowPlanDraftRecord, WorkflowPlanStep,
20};
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
23#[serde(rename_all = "snake_case")]
24pub enum WorkflowSourceKind {
25    BuiltIn,
26    Pack,
27    Workspace,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
31pub struct WorkflowSourceRef {
32    pub kind: WorkflowSourceKind,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub pack_id: Option<String>,
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub path: Option<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40pub struct WorkflowActionSpec {
41    pub action: String,
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub with: Option<Value>,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
47pub struct WorkflowStepSpec {
48    pub step_id: String,
49    pub action: String,
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub with: Option<Value>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
55pub struct WorkflowHookBinding {
56    pub binding_id: String,
57    pub workflow_id: String,
58    pub event: String,
59    #[serde(default = "default_true")]
60    pub enabled: bool,
61    #[serde(default)]
62    pub actions: Vec<WorkflowActionSpec>,
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub source: Option<WorkflowSourceRef>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
68pub struct WorkflowSpec {
69    pub workflow_id: String,
70    pub name: String,
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub description: Option<String>,
73    #[serde(default = "default_true")]
74    pub enabled: bool,
75    #[serde(default)]
76    pub steps: Vec<WorkflowStepSpec>,
77    #[serde(default)]
78    pub hooks: Vec<WorkflowHookBinding>,
79    #[serde(default, skip_serializing_if = "Option::is_none")]
80    pub source: Option<WorkflowSourceRef>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
84pub struct WorkflowRegistry {
85    #[serde(default)]
86    pub workflows: HashMap<String, WorkflowSpec>,
87    #[serde(default)]
88    pub hooks: Vec<WorkflowHookBinding>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
92#[serde(rename_all = "snake_case")]
93pub enum WorkflowRunStatus {
94    Queued,
95    Running,
96    Completed,
97    Failed,
98    DryRun,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(rename_all = "snake_case")]
103pub enum WorkflowActionRunStatus {
104    Pending,
105    Running,
106    Completed,
107    Failed,
108    Skipped,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
112pub struct WorkflowActionRunRecord {
113    pub action_id: String,
114    pub action: String,
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub task_id: Option<String>,
117    pub status: WorkflowActionRunStatus,
118    #[serde(default, skip_serializing_if = "Option::is_none")]
119    pub detail: Option<String>,
120    #[serde(default, skip_serializing_if = "Option::is_none")]
121    pub output: Option<Value>,
122    pub updated_at_ms: u64,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
126pub struct WorkflowRunRecord {
127    pub run_id: String,
128    pub workflow_id: String,
129    #[serde(default, skip_serializing_if = "Option::is_none")]
130    pub automation_id: Option<String>,
131    #[serde(default, skip_serializing_if = "Option::is_none")]
132    pub automation_run_id: Option<String>,
133    #[serde(default, skip_serializing_if = "Option::is_none")]
134    pub binding_id: Option<String>,
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub trigger_event: Option<String>,
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub source_event_id: Option<String>,
139    #[serde(default, skip_serializing_if = "Option::is_none")]
140    pub task_id: Option<String>,
141    pub status: WorkflowRunStatus,
142    pub created_at_ms: u64,
143    pub updated_at_ms: u64,
144    #[serde(default, skip_serializing_if = "Option::is_none")]
145    pub finished_at_ms: Option<u64>,
146    #[serde(default)]
147    pub actions: Vec<WorkflowActionRunRecord>,
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub source: Option<WorkflowSourceRef>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
153pub struct WorkflowSimulationResult {
154    #[serde(default)]
155    pub matched_bindings: Vec<WorkflowHookBinding>,
156    #[serde(default)]
157    pub planned_actions: Vec<WorkflowActionSpec>,
158    #[serde(default)]
159    pub canonical_events: Vec<String>,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
163#[serde(rename_all = "snake_case")]
164pub enum WorkflowValidationSeverity {
165    Info,
166    Warning,
167    Error,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
171pub struct WorkflowValidationMessage {
172    pub severity: WorkflowValidationSeverity,
173    pub message: String,
174}
175
176#[derive(Debug, Clone)]
177pub struct WorkflowLoadSource {
178    pub root: PathBuf,
179    pub kind: WorkflowSourceKind,
180    pub pack_id: Option<String>,
181}
182
183#[derive(Debug, Deserialize)]
184struct WorkflowFileEnvelope {
185    #[serde(default)]
186    workflow: Option<WorkflowFileShape>,
187    #[serde(default)]
188    hooks: Option<Value>,
189}
190
191#[derive(Debug, Deserialize)]
192struct WorkflowFileShape {
193    #[serde(default)]
194    id: Option<String>,
195    #[serde(default)]
196    workflow_id: Option<String>,
197    #[serde(default)]
198    name: Option<String>,
199    #[serde(default)]
200    description: Option<String>,
201    #[serde(default)]
202    enabled: Option<bool>,
203    #[serde(default)]
204    steps: Vec<WorkflowStepInput>,
205    #[serde(default)]
206    hooks: Option<Value>,
207}
208
209#[derive(Debug, Deserialize)]
210#[serde(untagged)]
211enum WorkflowStepInput {
212    String(String),
213    Object(WorkflowStepObjectInput),
214}
215
216#[derive(Debug, Deserialize)]
217struct WorkflowStepObjectInput {
218    #[serde(default)]
219    id: Option<String>,
220    #[serde(default)]
221    step_id: Option<String>,
222    action: String,
223    #[serde(default)]
224    with: Option<Value>,
225}
226
227#[derive(Debug, Deserialize)]
228#[serde(untagged)]
229enum HookFileShape {
230    Map(HashMap<String, Vec<HookActionInput>>),
231    List(Vec<HookBindingInput>),
232}
233
234#[derive(Debug, Deserialize)]
235struct HookBindingInput {
236    #[serde(default)]
237    id: Option<String>,
238    #[serde(default)]
239    binding_id: Option<String>,
240    #[serde(default)]
241    workflow: Option<String>,
242    #[serde(default)]
243    workflow_id: Option<String>,
244    event: String,
245    #[serde(default)]
246    enabled: Option<bool>,
247    #[serde(default)]
248    actions: Vec<HookActionInput>,
249}
250
251#[derive(Debug, Clone, Deserialize)]
252#[serde(untagged)]
253enum HookActionInput {
254    String(String),
255    Object(WorkflowActionSpec),
256}
257
258pub fn load_registry(sources: &[WorkflowLoadSource]) -> anyhow::Result<WorkflowRegistry> {
259    let mut registry = WorkflowRegistry::default();
260    for source in sources {
261        load_source_into(&mut registry, source)?;
262    }
263    Ok(registry)
264}
265
266pub fn validate_registry(registry: &WorkflowRegistry) -> Vec<WorkflowValidationMessage> {
267    let mut messages = Vec::new();
268    for workflow in registry.workflows.values() {
269        if workflow.steps.is_empty()
270            && registry
271                .hooks
272                .iter()
273                .all(|hook| hook.workflow_id != workflow.workflow_id)
274        {
275            messages.push(WorkflowValidationMessage {
276                severity: WorkflowValidationSeverity::Warning,
277                message: format!(
278                    "workflow `{}` has no steps and no hook bindings",
279                    workflow.workflow_id
280                ),
281            });
282        }
283        for step in &workflow.steps {
284            if step.action.trim().is_empty() {
285                messages.push(WorkflowValidationMessage {
286                    severity: WorkflowValidationSeverity::Error,
287                    message: format!(
288                        "workflow `{}` has step `{}` with empty action",
289                        workflow.workflow_id, step.step_id
290                    ),
291                });
292            }
293        }
294    }
295    for hook in &registry.hooks {
296        if !registry.workflows.contains_key(&hook.workflow_id) {
297            messages.push(WorkflowValidationMessage {
298                severity: WorkflowValidationSeverity::Error,
299                message: format!(
300                    "hook `{}` references unknown workflow `{}`",
301                    hook.binding_id, hook.workflow_id
302                ),
303            });
304        }
305        if hook.actions.is_empty() {
306            messages.push(WorkflowValidationMessage {
307                severity: WorkflowValidationSeverity::Warning,
308                message: format!("hook `{}` has no actions", hook.binding_id),
309            });
310        }
311    }
312    messages
313}
314
315fn load_source_into(
316    registry: &mut WorkflowRegistry,
317    source: &WorkflowLoadSource,
318) -> anyhow::Result<()> {
319    for entry in collect_yaml_files(&source.root.join("workflows"))? {
320        let workflow = load_workflow_file(&entry, source)?;
321        registry
322            .workflows
323            .insert(workflow.workflow_id.clone(), workflow.clone());
324        registry.hooks.retain(|hook| hook.workflow_id != workflow.workflow_id || !matches!(hook.source.as_ref(), Some(src) if src.path.as_deref() == Some(&entry.to_string_lossy().to_string())));
325        registry.hooks.extend(workflow.hooks.clone());
326    }
327    for entry in collect_yaml_files(&source.root.join("hooks"))? {
328        registry.hooks.extend(load_hook_file(&entry, source)?);
329    }
330    Ok(())
331}
332
333fn collect_yaml_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
334    let mut files = Vec::new();
335    let entries = match fs::read_dir(dir) {
336        Ok(entries) => entries,
337        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(files),
338        Err(err) => return Err(err.into()),
339    };
340    for entry in entries {
341        let path = entry?.path();
342        if path.is_dir() {
343            files.extend(collect_yaml_files(&path)?);
344            continue;
345        }
346        let ext = path
347            .extension()
348            .and_then(|v| v.to_str())
349            .unwrap_or_default();
350        if matches!(ext, "yaml" | "yml") {
351            files.push(path);
352        }
353    }
354    files.sort();
355    Ok(files)
356}
357
358fn load_workflow_file(path: &Path, source: &WorkflowLoadSource) -> anyhow::Result<WorkflowSpec> {
359    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
360    let parsed = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
361        .with_context(|| format!("parse workflow yaml {}", path.display()))?;
362    let workflow = parsed
363        .workflow
364        .ok_or_else(|| anyhow::anyhow!("missing `workflow` key"))?;
365    let workflow_id = workflow
366        .workflow_id
367        .or(workflow.id)
368        .or_else(|| {
369            path.file_stem()
370                .and_then(|v| v.to_str())
371                .map(ToString::to_string)
372        })
373        .ok_or_else(|| anyhow::anyhow!("workflow id missing"))?;
374    let name = workflow.name.clone().unwrap_or_else(|| workflow_id.clone());
375    let source_ref = source_ref(source, path);
376    let steps = workflow
377        .steps
378        .into_iter()
379        .enumerate()
380        .map(|(idx, step)| match step {
381            WorkflowStepInput::String(action) => WorkflowStepSpec {
382                step_id: format!("step_{}", idx + 1),
383                action,
384                with: None,
385            },
386            WorkflowStepInput::Object(step) => WorkflowStepSpec {
387                step_id: step
388                    .step_id
389                    .or(step.id)
390                    .unwrap_or_else(|| format!("step_{}", idx + 1)),
391                action: step.action,
392                with: step.with,
393            },
394        })
395        .collect::<Vec<_>>();
396    let mut hooks = parse_hooks_value(
397        workflow.hooks.as_ref().or(parsed.hooks.as_ref()),
398        &workflow_id,
399        &source_ref,
400    )?;
401    for hook in &mut hooks {
402        if hook.workflow_id.is_empty() {
403            hook.workflow_id = workflow_id.clone();
404        }
405    }
406    Ok(WorkflowSpec {
407        workflow_id,
408        name,
409        description: workflow.description,
410        enabled: workflow.enabled.unwrap_or(true),
411        steps,
412        hooks,
413        source: Some(source_ref),
414    })
415}
416
417fn load_hook_file(
418    path: &Path,
419    source: &WorkflowLoadSource,
420) -> anyhow::Result<Vec<WorkflowHookBinding>> {
421    let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
422    let env = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
423        .with_context(|| format!("parse hook yaml {}", path.display()))?;
424    parse_hooks_value(env.hooks.as_ref(), "", &source_ref(source, path))
425}
426
427fn parse_hooks_value(
428    hooks_value: Option<&Value>,
429    default_workflow_id: &str,
430    source_ref: &WorkflowSourceRef,
431) -> anyhow::Result<Vec<WorkflowHookBinding>> {
432    let Some(hooks_value) = hooks_value else {
433        return Ok(Vec::new());
434    };
435    let shape = serde_json::from_value::<HookFileShape>(hooks_value.clone())
436        .or_else(|_| serde_yaml::from_value::<HookFileShape>(serde_yaml::to_value(hooks_value)?))
437        .context("parse hooks")?;
438    let mut out = Vec::new();
439    match shape {
440        HookFileShape::Map(map) => {
441            for (event, actions) in map {
442                out.push(WorkflowHookBinding {
443                    binding_id: format!(
444                        "{}.{}",
445                        default_workflow_id_or_default(default_workflow_id),
446                        normalize_ident(&event)
447                    ),
448                    workflow_id: default_workflow_id.to_string(),
449                    event,
450                    enabled: true,
451                    actions: actions.into_iter().map(to_action_spec).collect(),
452                    source: Some(source_ref.clone()),
453                });
454            }
455        }
456        HookFileShape::List(items) => {
457            for item in items {
458                out.push(WorkflowHookBinding {
459                    binding_id: item.binding_id.or(item.id).unwrap_or_else(|| {
460                        format!(
461                            "{}.{}",
462                            item.workflow_id
463                                .clone()
464                                .or(item.workflow.clone())
465                                .unwrap_or_else(|| default_workflow_id_or_default(
466                                    default_workflow_id
467                                )),
468                            normalize_ident(&item.event)
469                        )
470                    }),
471                    workflow_id: item
472                        .workflow_id
473                        .or(item.workflow)
474                        .unwrap_or_else(|| default_workflow_id.to_string()),
475                    event: item.event,
476                    enabled: item.enabled.unwrap_or(true),
477                    actions: item.actions.into_iter().map(to_action_spec).collect(),
478                    source: Some(source_ref.clone()),
479                });
480            }
481        }
482    }
483    Ok(out)
484}
485
486fn default_workflow_id_or_default(workflow_id: &str) -> String {
487    if workflow_id.trim().is_empty() {
488        "workflow".to_string()
489    } else {
490        workflow_id.to_string()
491    }
492}
493
494fn to_action_spec(input: HookActionInput) -> WorkflowActionSpec {
495    match input {
496        HookActionInput::String(action) => WorkflowActionSpec { action, with: None },
497        HookActionInput::Object(spec) => spec,
498    }
499}
500
501fn normalize_ident(input: &str) -> String {
502    input
503        .trim()
504        .to_ascii_lowercase()
505        .replace([' ', '/', '.'], "_")
506}
507
508fn source_ref(source: &WorkflowLoadSource, path: &Path) -> WorkflowSourceRef {
509    WorkflowSourceRef {
510        kind: source.kind.clone(),
511        pack_id: source.pack_id.clone(),
512        path: Some(path.to_string_lossy().to_string()),
513    }
514}
515
516fn default_true() -> bool {
517    true
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use tempfile::tempdir;
524
525    #[test]
526    fn loads_workflow_with_embedded_hooks() {
527        let dir = tempdir().expect("dir");
528        let workflows_dir = dir.path().join("workflows");
529        fs::create_dir_all(&workflows_dir).expect("mkdir");
530        fs::write(
531            workflows_dir.join("demo.yaml"),
532            r#"
533workflow:
534  id: build_feature
535  name: Build Feature
536  steps:
537    - planner
538    - action: verifier.run
539      with:
540        strict: true
541  hooks:
542    task_created:
543      - kanban.update
544      - action: slack.notify
545        with:
546          channel: engineering
547"#,
548        )
549        .expect("write");
550        let registry = load_registry(&[WorkflowLoadSource {
551            root: dir.path().to_path_buf(),
552            kind: WorkflowSourceKind::Workspace,
553            pack_id: None,
554        }])
555        .expect("registry");
556        let workflow = registry.workflows.get("build_feature").expect("workflow");
557        assert_eq!(workflow.steps.len(), 2);
558        assert_eq!(registry.hooks.len(), 1);
559        assert_eq!(registry.hooks[0].actions.len(), 2);
560    }
561}