1use anyhow::Context;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
9#[serde(rename_all = "snake_case")]
10pub enum WorkflowSourceKind {
11 BuiltIn,
12 Pack,
13 Workspace,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub struct WorkflowSourceRef {
18 pub kind: WorkflowSourceKind,
19 #[serde(default, skip_serializing_if = "Option::is_none")]
20 pub pack_id: Option<String>,
21 #[serde(default, skip_serializing_if = "Option::is_none")]
22 pub path: Option<String>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26pub struct WorkflowActionSpec {
27 pub action: String,
28 #[serde(default, skip_serializing_if = "Option::is_none")]
29 pub with: Option<Value>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
33pub struct WorkflowStepSpec {
34 pub step_id: String,
35 pub action: String,
36 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub with: Option<Value>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
41pub struct WorkflowHookBinding {
42 pub binding_id: String,
43 pub workflow_id: String,
44 pub event: String,
45 #[serde(default = "default_true")]
46 pub enabled: bool,
47 #[serde(default)]
48 pub actions: Vec<WorkflowActionSpec>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub source: Option<WorkflowSourceRef>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54pub struct WorkflowSpec {
55 pub workflow_id: String,
56 pub name: String,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub description: Option<String>,
59 #[serde(default = "default_true")]
60 pub enabled: bool,
61 #[serde(default)]
62 pub steps: Vec<WorkflowStepSpec>,
63 #[serde(default)]
64 pub hooks: Vec<WorkflowHookBinding>,
65 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub source: Option<WorkflowSourceRef>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
70pub struct WorkflowRegistry {
71 #[serde(default)]
72 pub workflows: HashMap<String, WorkflowSpec>,
73 #[serde(default)]
74 pub hooks: Vec<WorkflowHookBinding>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
78#[serde(rename_all = "snake_case")]
79pub enum WorkflowRunStatus {
80 Queued,
81 Running,
82 Completed,
83 Failed,
84 DryRun,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(rename_all = "snake_case")]
89pub enum WorkflowActionRunStatus {
90 Pending,
91 Running,
92 Completed,
93 Failed,
94 Skipped,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98pub struct WorkflowActionRunRecord {
99 pub action_id: String,
100 pub action: String,
101 #[serde(default, skip_serializing_if = "Option::is_none")]
102 pub task_id: Option<String>,
103 pub status: WorkflowActionRunStatus,
104 #[serde(default, skip_serializing_if = "Option::is_none")]
105 pub detail: Option<String>,
106 #[serde(default, skip_serializing_if = "Option::is_none")]
107 pub output: Option<Value>,
108 pub updated_at_ms: u64,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
112pub struct WorkflowRunRecord {
113 pub run_id: String,
114 pub workflow_id: String,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
116 pub binding_id: Option<String>,
117 #[serde(default, skip_serializing_if = "Option::is_none")]
118 pub trigger_event: Option<String>,
119 #[serde(default, skip_serializing_if = "Option::is_none")]
120 pub source_event_id: Option<String>,
121 #[serde(default, skip_serializing_if = "Option::is_none")]
122 pub task_id: Option<String>,
123 pub status: WorkflowRunStatus,
124 pub created_at_ms: u64,
125 pub updated_at_ms: u64,
126 #[serde(default, skip_serializing_if = "Option::is_none")]
127 pub finished_at_ms: Option<u64>,
128 #[serde(default)]
129 pub actions: Vec<WorkflowActionRunRecord>,
130 #[serde(default, skip_serializing_if = "Option::is_none")]
131 pub source: Option<WorkflowSourceRef>,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub struct WorkflowSimulationResult {
136 #[serde(default)]
137 pub matched_bindings: Vec<WorkflowHookBinding>,
138 #[serde(default)]
139 pub planned_actions: Vec<WorkflowActionSpec>,
140 #[serde(default)]
141 pub canonical_events: Vec<String>,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
145#[serde(rename_all = "snake_case")]
146pub enum WorkflowValidationSeverity {
147 Info,
148 Warning,
149 Error,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
153pub struct WorkflowValidationMessage {
154 pub severity: WorkflowValidationSeverity,
155 pub message: String,
156}
157
158#[derive(Debug, Clone)]
159pub struct WorkflowLoadSource {
160 pub root: PathBuf,
161 pub kind: WorkflowSourceKind,
162 pub pack_id: Option<String>,
163}
164
165#[derive(Debug, Deserialize)]
166struct WorkflowFileEnvelope {
167 #[serde(default)]
168 workflow: Option<WorkflowFileShape>,
169 #[serde(default)]
170 hooks: Option<Value>,
171}
172
173#[derive(Debug, Deserialize)]
174struct WorkflowFileShape {
175 #[serde(default)]
176 id: Option<String>,
177 #[serde(default)]
178 workflow_id: Option<String>,
179 #[serde(default)]
180 name: Option<String>,
181 #[serde(default)]
182 description: Option<String>,
183 #[serde(default)]
184 enabled: Option<bool>,
185 #[serde(default)]
186 steps: Vec<WorkflowStepInput>,
187 #[serde(default)]
188 hooks: Option<Value>,
189}
190
191#[derive(Debug, Deserialize)]
192#[serde(untagged)]
193enum WorkflowStepInput {
194 String(String),
195 Object(WorkflowStepObjectInput),
196}
197
198#[derive(Debug, Deserialize)]
199struct WorkflowStepObjectInput {
200 #[serde(default)]
201 id: Option<String>,
202 #[serde(default)]
203 step_id: Option<String>,
204 action: String,
205 #[serde(default)]
206 with: Option<Value>,
207}
208
209#[derive(Debug, Deserialize)]
210#[serde(untagged)]
211enum HookFileShape {
212 Map(HashMap<String, Vec<HookActionInput>>),
213 List(Vec<HookBindingInput>),
214}
215
216#[derive(Debug, Deserialize)]
217struct HookBindingInput {
218 #[serde(default)]
219 id: Option<String>,
220 #[serde(default)]
221 binding_id: Option<String>,
222 #[serde(default)]
223 workflow: Option<String>,
224 #[serde(default)]
225 workflow_id: Option<String>,
226 event: String,
227 #[serde(default)]
228 enabled: Option<bool>,
229 #[serde(default)]
230 actions: Vec<HookActionInput>,
231}
232
233#[derive(Debug, Clone, Deserialize)]
234#[serde(untagged)]
235enum HookActionInput {
236 String(String),
237 Object(WorkflowActionSpec),
238}
239
240pub fn load_registry(sources: &[WorkflowLoadSource]) -> anyhow::Result<WorkflowRegistry> {
241 let mut registry = WorkflowRegistry::default();
242 for source in sources {
243 load_source_into(&mut registry, source)?;
244 }
245 Ok(registry)
246}
247
248pub fn validate_registry(registry: &WorkflowRegistry) -> Vec<WorkflowValidationMessage> {
249 let mut messages = Vec::new();
250 for workflow in registry.workflows.values() {
251 if workflow.steps.is_empty()
252 && registry
253 .hooks
254 .iter()
255 .all(|hook| hook.workflow_id != workflow.workflow_id)
256 {
257 messages.push(WorkflowValidationMessage {
258 severity: WorkflowValidationSeverity::Warning,
259 message: format!(
260 "workflow `{}` has no steps and no hook bindings",
261 workflow.workflow_id
262 ),
263 });
264 }
265 for step in &workflow.steps {
266 if step.action.trim().is_empty() {
267 messages.push(WorkflowValidationMessage {
268 severity: WorkflowValidationSeverity::Error,
269 message: format!(
270 "workflow `{}` has step `{}` with empty action",
271 workflow.workflow_id, step.step_id
272 ),
273 });
274 }
275 }
276 }
277 for hook in ®istry.hooks {
278 if !registry.workflows.contains_key(&hook.workflow_id) {
279 messages.push(WorkflowValidationMessage {
280 severity: WorkflowValidationSeverity::Error,
281 message: format!(
282 "hook `{}` references unknown workflow `{}`",
283 hook.binding_id, hook.workflow_id
284 ),
285 });
286 }
287 if hook.actions.is_empty() {
288 messages.push(WorkflowValidationMessage {
289 severity: WorkflowValidationSeverity::Warning,
290 message: format!("hook `{}` has no actions", hook.binding_id),
291 });
292 }
293 }
294 messages
295}
296
297fn load_source_into(
298 registry: &mut WorkflowRegistry,
299 source: &WorkflowLoadSource,
300) -> anyhow::Result<()> {
301 for entry in collect_yaml_files(&source.root.join("workflows"))? {
302 let workflow = load_workflow_file(&entry, source)?;
303 registry
304 .workflows
305 .insert(workflow.workflow_id.clone(), workflow.clone());
306 registry.hooks.retain(|hook| hook.workflow_id != workflow.workflow_id || !matches!(hook.source.as_ref(), Some(src) if src.path.as_deref() == Some(&entry.to_string_lossy().to_string())));
307 registry.hooks.extend(workflow.hooks.clone());
308 }
309 for entry in collect_yaml_files(&source.root.join("hooks"))? {
310 registry.hooks.extend(load_hook_file(&entry, source)?);
311 }
312 Ok(())
313}
314
315fn collect_yaml_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
316 let mut files = Vec::new();
317 let entries = match fs::read_dir(dir) {
318 Ok(entries) => entries,
319 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(files),
320 Err(err) => return Err(err.into()),
321 };
322 for entry in entries {
323 let path = entry?.path();
324 if path.is_dir() {
325 files.extend(collect_yaml_files(&path)?);
326 continue;
327 }
328 let ext = path
329 .extension()
330 .and_then(|v| v.to_str())
331 .unwrap_or_default();
332 if matches!(ext, "yaml" | "yml") {
333 files.push(path);
334 }
335 }
336 files.sort();
337 Ok(files)
338}
339
340fn load_workflow_file(path: &Path, source: &WorkflowLoadSource) -> anyhow::Result<WorkflowSpec> {
341 let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
342 let parsed = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
343 .with_context(|| format!("parse workflow yaml {}", path.display()))?;
344 let workflow = parsed
345 .workflow
346 .ok_or_else(|| anyhow::anyhow!("missing `workflow` key"))?;
347 let workflow_id = workflow
348 .workflow_id
349 .or(workflow.id)
350 .or_else(|| {
351 path.file_stem()
352 .and_then(|v| v.to_str())
353 .map(ToString::to_string)
354 })
355 .ok_or_else(|| anyhow::anyhow!("workflow id missing"))?;
356 let name = workflow.name.clone().unwrap_or_else(|| workflow_id.clone());
357 let source_ref = source_ref(source, path);
358 let steps = workflow
359 .steps
360 .into_iter()
361 .enumerate()
362 .map(|(idx, step)| match step {
363 WorkflowStepInput::String(action) => WorkflowStepSpec {
364 step_id: format!("step_{}", idx + 1),
365 action,
366 with: None,
367 },
368 WorkflowStepInput::Object(step) => WorkflowStepSpec {
369 step_id: step
370 .step_id
371 .or(step.id)
372 .unwrap_or_else(|| format!("step_{}", idx + 1)),
373 action: step.action,
374 with: step.with,
375 },
376 })
377 .collect::<Vec<_>>();
378 let mut hooks = parse_hooks_value(
379 workflow.hooks.as_ref().or(parsed.hooks.as_ref()),
380 &workflow_id,
381 &source_ref,
382 )?;
383 for hook in &mut hooks {
384 if hook.workflow_id.is_empty() {
385 hook.workflow_id = workflow_id.clone();
386 }
387 }
388 Ok(WorkflowSpec {
389 workflow_id,
390 name,
391 description: workflow.description,
392 enabled: workflow.enabled.unwrap_or(true),
393 steps,
394 hooks,
395 source: Some(source_ref),
396 })
397}
398
399fn load_hook_file(
400 path: &Path,
401 source: &WorkflowLoadSource,
402) -> anyhow::Result<Vec<WorkflowHookBinding>> {
403 let raw = fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
404 let env = serde_yaml::from_str::<WorkflowFileEnvelope>(&raw)
405 .with_context(|| format!("parse hook yaml {}", path.display()))?;
406 parse_hooks_value(env.hooks.as_ref(), "", &source_ref(source, path))
407}
408
409fn parse_hooks_value(
410 hooks_value: Option<&Value>,
411 default_workflow_id: &str,
412 source_ref: &WorkflowSourceRef,
413) -> anyhow::Result<Vec<WorkflowHookBinding>> {
414 let Some(hooks_value) = hooks_value else {
415 return Ok(Vec::new());
416 };
417 let shape = serde_json::from_value::<HookFileShape>(hooks_value.clone())
418 .or_else(|_| serde_yaml::from_value::<HookFileShape>(serde_yaml::to_value(hooks_value)?))
419 .context("parse hooks")?;
420 let mut out = Vec::new();
421 match shape {
422 HookFileShape::Map(map) => {
423 for (event, actions) in map {
424 out.push(WorkflowHookBinding {
425 binding_id: format!(
426 "{}.{}",
427 default_workflow_id_or_default(default_workflow_id),
428 normalize_ident(&event)
429 ),
430 workflow_id: default_workflow_id.to_string(),
431 event,
432 enabled: true,
433 actions: actions.into_iter().map(to_action_spec).collect(),
434 source: Some(source_ref.clone()),
435 });
436 }
437 }
438 HookFileShape::List(items) => {
439 for item in items {
440 out.push(WorkflowHookBinding {
441 binding_id: item.binding_id.or(item.id).unwrap_or_else(|| {
442 format!(
443 "{}.{}",
444 item.workflow_id
445 .clone()
446 .or(item.workflow.clone())
447 .unwrap_or_else(|| default_workflow_id_or_default(
448 default_workflow_id
449 )),
450 normalize_ident(&item.event)
451 )
452 }),
453 workflow_id: item
454 .workflow_id
455 .or(item.workflow)
456 .unwrap_or_else(|| default_workflow_id.to_string()),
457 event: item.event,
458 enabled: item.enabled.unwrap_or(true),
459 actions: item.actions.into_iter().map(to_action_spec).collect(),
460 source: Some(source_ref.clone()),
461 });
462 }
463 }
464 }
465 Ok(out)
466}
467
468fn default_workflow_id_or_default(workflow_id: &str) -> String {
469 if workflow_id.trim().is_empty() {
470 "workflow".to_string()
471 } else {
472 workflow_id.to_string()
473 }
474}
475
476fn to_action_spec(input: HookActionInput) -> WorkflowActionSpec {
477 match input {
478 HookActionInput::String(action) => WorkflowActionSpec { action, with: None },
479 HookActionInput::Object(spec) => spec,
480 }
481}
482
483fn normalize_ident(input: &str) -> String {
484 input
485 .trim()
486 .to_ascii_lowercase()
487 .replace([' ', '/', '.'], "_")
488}
489
490fn source_ref(source: &WorkflowLoadSource, path: &Path) -> WorkflowSourceRef {
491 WorkflowSourceRef {
492 kind: source.kind.clone(),
493 pack_id: source.pack_id.clone(),
494 path: Some(path.to_string_lossy().to_string()),
495 }
496}
497
498fn default_true() -> bool {
499 true
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use tempfile::tempdir;
506
507 #[test]
508 fn loads_workflow_with_embedded_hooks() {
509 let dir = tempdir().expect("dir");
510 let workflows_dir = dir.path().join("workflows");
511 fs::create_dir_all(&workflows_dir).expect("mkdir");
512 fs::write(
513 workflows_dir.join("demo.yaml"),
514 r#"
515workflow:
516 id: build_feature
517 name: Build Feature
518 steps:
519 - planner
520 - action: verifier.run
521 with:
522 strict: true
523 hooks:
524 task_created:
525 - kanban.update
526 - action: slack.notify
527 with:
528 channel: engineering
529"#,
530 )
531 .expect("write");
532 let registry = load_registry(&[WorkflowLoadSource {
533 root: dir.path().to_path_buf(),
534 kind: WorkflowSourceKind::Workspace,
535 pack_id: None,
536 }])
537 .expect("registry");
538 let workflow = registry.workflows.get("build_feature").expect("workflow");
539 assert_eq!(workflow.steps.len(), 2);
540 assert_eq!(registry.hooks.len(), 1);
541 assert_eq!(registry.hooks[0].actions.len(), 2);
542 }
543}