Skip to main content

wfe_core/models/
workflow_definition.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use serde::{Deserialize, Serialize};
5
6use super::condition::StepCondition;
7use super::error_behavior::ErrorBehavior;
8use super::service::ServiceDefinition;
9
10/// Declaration of a volume that persists across every step in a workflow
11/// run, including sub-workflows started via `type: workflow` steps. Backends
12/// that support it (currently just Kubernetes) provision a single volume
13/// per top-level workflow instance and mount it on every step container at
14/// `mount_path`. Sub-workflows see the same volume because they share the
15/// parent's isolation domain (namespace, in the K8s case).
16///
17/// Declared once on the top-level workflow (e.g. `ci`) that orchestrates
18/// the sub-workflows. Declarations on non-root workflows are ignored in
19/// favor of the root's declaration.
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
21pub struct SharedVolume {
22    /// Absolute path the volume is mounted at inside every step container.
23    /// Typical value: `/workspace`.
24    pub mount_path: String,
25    /// Optional size override (e.g. `"20Gi"`). When unset the backend falls
26    /// back to its configured default (ClusterConfig::default_shared_volume_size
27    /// for the Kubernetes executor).
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub size: Option<String>,
30}
31
32impl Default for SharedVolume {
33    fn default() -> Self {
34        Self {
35            mount_path: "/workspace".to_string(),
36            size: None,
37        }
38    }
39}
40
41/// A compiled workflow definition ready for execution.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct WorkflowDefinition {
44    /// Stable slug used as the primary key (e.g. "ci", "checkout"). Must be
45    /// unique within a host. Referenced by other workflows, webhooks, and
46    /// clients when starting new instances.
47    pub id: String,
48    /// Optional human-friendly display name surfaced in UIs, listings, and
49    /// logs (e.g. "Continuous Integration"). Falls back to `id` when unset.
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub name: Option<String>,
52    /// Version.
53    pub version: u32,
54    /// Description.
55    pub description: Option<String>,
56    /// Steps.
57    pub steps: Vec<WorkflowStep>,
58    /// Default error behavior.
59    pub default_error_behavior: ErrorBehavior,
60    #[serde(default, with = "super::option_duration_millis")]
61    /// Default error retry interval.
62    pub default_error_retry_interval: Option<Duration>,
63    /// Infrastructure services required by this workflow (databases, caches, etc.).
64    #[serde(default, skip_serializing_if = "Vec::is_empty")]
65    pub services: Vec<ServiceDefinition>,
66    /// When set, the backend provisions a single persistent volume for the
67    /// top-level workflow instance and mounts it on every step container.
68    /// All sub-workflows inherit the same volume through their shared
69    /// namespace/isolation domain. Sub-workflow declarations are ignored.
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub shared_volume: Option<SharedVolume>,
72}
73
74impl WorkflowDefinition {
75    pub fn new(id: impl Into<String>, version: u32) -> Self {
76        Self {
77            id: id.into(),
78            name: None,
79            version,
80            description: None,
81            steps: Vec::new(),
82            default_error_behavior: ErrorBehavior::default(),
83            default_error_retry_interval: None,
84            services: Vec::new(),
85            shared_volume: None,
86        }
87    }
88
89    /// Return the display name when set, otherwise fall back to the slug id.
90    pub fn display_name(&self) -> &str {
91        self.name.as_deref().unwrap_or(&self.id)
92    }
93
94    /// Write a Graphviz DOT representation of this workflow to the given writer.
95    ///
96    /// The output is a directed graph showing:
97    ///
98    /// | Visual element | Meaning |
99    /// |----------------|---------|
100    /// | **Node** | One per step, labeled with name and type |
101    /// | **Solid edge** | Sequential execution flow (`outcomes`) |
102    /// | **Dotted gray edge** | Container/child relationship |
103    /// | **Dashed red edge** | Compensation path |
104    ///
105    /// Container steps (parallel, if, while, foreach, saga, decide) are drawn
106    /// with a distinct fill colour so you can spot control-flow primitives at a
107    /// glance.
108    ///
109    /// # Example
110    ///
111    /// ```ignore
112    /// let definition = WorkflowBuilder::<MyData>::new()
113    ///     .start_with::<StepA>()
114    ///     .then::<StepB>()
115    ///     .end_workflow()
116    ///     .build("demo", 1);
117    ///
118    /// let dot = definition.to_dot();
119    /// std::fs::write("workflow.dot", &dot).unwrap();
120    /// ```
121    pub fn write_dot<W: std::fmt::Write>(&self, w: &mut W) -> std::fmt::Result {
122        let display = self.display_name();
123        writeln!(w, "digraph {} {{", dot_escape_id(&self.id))?;
124        writeln!(w, "    label={}", dot_escape(&format!("{} v{}", display, self.version)))?;
125        writeln!(w, "    labelloc=t")?;
126        writeln!(w, "    fontsize=14")?;
127        writeln!(w, "    fontname=\"Helvetica-Bold\"")?;
128        writeln!(w, "    rankdir=TB")?;
129        writeln!(w, "    graph [fontname=\"Helvetica\", fontsize=10, bgcolor=\"white\", margin=20]")?;
130        writeln!(
131            w,
132            "    node [fontname=\"Helvetica\", shape=box, style=\"rounded,filled\", fillcolor=\"#f3f4f6\", fontsize=9, penwidth=1.0]"
133        )?;
134        writeln!(
135            w,
136            "    edge [fontname=\"Helvetica\", fontsize=8, color=\"#374151\", penwidth=1.0]"
137        )?;
138        writeln!(w)?;
139
140        // --- nodes ---
141        for step in &self.steps {
142            let label = self.step_dot_label(step);
143            let fill = step_fill_color(step);
144            let shape = step_shape(step);
145            let mut attrs: Vec<String> = Vec::new();
146
147            attrs.push(format!("label={}", label));
148            attrs.push(format!("shape={}", shape));
149            attrs.push(format!("fillcolor=\"{}\"", fill));
150
151            if step.when.is_some() {
152                attrs.push("style=\"rounded,filled,dashed\"".to_string());
153                attrs.push("color=\"#059669\"".to_string()); // green border for condition
154            }
155
156            if step.error_behavior.is_some() {
157                attrs.push("penwidth=1.5".to_string());
158            }
159
160            write!(w, "    N{} [", step.id)?;
161            write!(w, "{}", attrs.join(", "))?;
162            writeln!(w, "];")?;
163        }
164
165        writeln!(w)?;
166
167        // --- outcome edges (sequential flow) ---
168        for step in &self.steps {
169            for outcome in &step.outcomes {
170                let mut edge_attrs: Vec<String> = Vec::new();
171
172                if let Some(ref lbl) = outcome.label {
173                    edge_attrs.push(format!("label={}", dot_escape(lbl)));
174                } else if let Some(ref val) = outcome.value {
175                    let txt = truncate(&format!("{}", val), 40);
176                    edge_attrs.push(format!("label={}", dot_escape(&txt)));
177                }
178
179                write!(w, "    N{} -> N{}", step.id, outcome.next_step)?;
180                if !edge_attrs.is_empty() {
181                    write!(w, " [{}]", edge_attrs.join(", "))?;
182                }
183                writeln!(w, ";")?;
184            }
185        }
186
187        // --- child edges (containment) ---
188        for step in &self.steps {
189            for &child_id in &step.children {
190                writeln!(
191                    w,
192                    "    N{} -> N{} [style=dotted, color=\"#9ca3af\", arrowhead=none, penwidth=1.2];",
193                    step.id, child_id
194                )?;
195            }
196        }
197
198        // --- compensation edges ---
199        for step in &self.steps {
200            if let Some(comp_id) = step.compensation_step_id {
201                writeln!(
202                    w,
203                    "    N{} -> N{} [style=dashed, color=\"#dc2626\", label=\"compensate\", fontcolor=\"#dc2626\", penwidth=1.2];",
204                    step.id, comp_id
205                )?;
206            }
207        }
208
209        writeln!(w, "}}")?;
210        Ok(())
211    }
212
213    /// Return a Graphviz DOT representation as a `String`.
214    pub fn to_dot(&self) -> String {
215        let mut buf = String::new();
216        self.write_dot(&mut buf)
217            .expect("writing to String is infallible");
218        buf
219    }
220
221    // ------------------------------------------------------------------
222    // helpers
223    // ------------------------------------------------------------------
224
225    fn step_dot_label(&self, step: &WorkflowStep) -> String {
226        let mut lines: Vec<String> = Vec::new();
227
228        // Name (if set)
229        if let Some(ref name) = step.name {
230            lines.push(format!(
231                "<TR><TD><B>{}</B></TD></TR>",
232                html_escape(name)
233            ));
234        }
235
236        // Type (always shown, cleaned up)
237        let type_name = clean_type_name(&step.step_type);
238        lines.push(format!(
239            "<TR><TD><FONT POINT-SIZE=\"8\" COLOR=\"#6b7280\">{}</FONT></TD></TR>",
240            html_escape(&type_name)
241        ));
242
243        // Condition badge
244        if step.when.is_some() {
245            lines.push(
246                "<TR><TD><FONT POINT-SIZE=\"7\" COLOR=\"#059669\">[when]</FONT></TD></TR>"
247                    .to_string(),
248            );
249        }
250
251        // Error behaviour badge
252        if let Some(ref eb) = step.error_behavior {
253            lines.push(format!(
254                "<TR><TD><FONT POINT-SIZE=\"7\" COLOR=\"#dc2626\">[{:?}]</FONT></TD></TR>",
255                eb
256            ));
257        }
258
259        format!(
260            "<\n  <TABLE BORDER=\"0\" CELLBORDER=\"0\" CELLSPACING=\"1\" CELLPADDING=\"2\">\n    {}\n  </TABLE>\n>",
261            lines.join("\n    ")
262        )
263    }
264}
265
266/// A single step in a workflow definition.
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct WorkflowStep {
269    /// Id.
270    pub id: usize,
271    /// Name.
272    pub name: Option<String>,
273    /// External id.
274    pub external_id: Option<String>,
275    /// Step type.
276    pub step_type: String,
277    /// Children.
278    pub children: Vec<usize>,
279    /// Outcomes.
280    pub outcomes: Vec<StepOutcome>,
281    /// Error behavior.
282    pub error_behavior: Option<ErrorBehavior>,
283    /// Compensation step id.
284    pub compensation_step_id: Option<usize>,
285    /// Do compensate.
286    pub do_compensate: bool,
287    #[serde(default)]
288    /// Saga.
289    pub saga: bool,
290    /// Serializable configuration for primitive steps (e.g. event_name, duration).
291    #[serde(default, skip_serializing_if = "Option::is_none")]
292    pub step_config: Option<serde_json::Value>,
293    /// Optional condition that must evaluate to true for this step to execute.
294    #[serde(default, skip_serializing_if = "Option::is_none")]
295    pub when: Option<StepCondition>,
296}
297
298impl WorkflowStep {
299    pub fn new(id: usize, step_type: impl Into<String>) -> Self {
300        Self {
301            id,
302            name: None,
303            external_id: None,
304            step_type: step_type.into(),
305            children: Vec::new(),
306            outcomes: Vec::new(),
307            error_behavior: None,
308            compensation_step_id: None,
309            do_compensate: false,
310            saga: false,
311            step_config: None,
312            when: None,
313        }
314    }
315
316    /// Extract artifact input declarations from the step's raw configuration.
317    ///
318    /// Looks for `config.inputs` (a map of name → target path) and
319    /// `config.input` (a single artifact name, used by buildkit). Returns
320    /// a map of input name → target path. For `config.input`, the target
321    /// path is empty since buildkit uses the artifact as a build context
322    /// override rather than a mount point.
323    pub fn artifact_inputs(&self) -> HashMap<String, String> {
324        let mut result = HashMap::new();
325
326        let config = match self.step_config.as_ref() {
327            Some(c) => c,
328            None => return result,
329        };
330
331        // Map form: inputs: { repo: /workspace/repo }
332        if let Some(inputs) = config.get("inputs").and_then(|v| v.as_object()) {
333            for (k, v) in inputs {
334                if let Some(s) = v.as_str() {
335                    result.insert(k.clone(), s.to_string());
336                }
337            }
338        }
339
340        // Single form: input: repo (buildkit)
341        if let Some(input) = config.get("input").and_then(|v| v.as_str()) {
342            result.insert(input.to_string(), String::new());
343        }
344
345        result
346    }
347}
348
349// ------------------------------------------------------------------
350// DOT rendering helpers
351// ------------------------------------------------------------------
352
353/// Escape a string for use inside DOT double-quoted identifiers / labels.
354fn dot_escape(s: &str) -> String {
355    let escaped = s.replace('\\', "\\\\").replace('"', "\\\"").replace('\n', "\\n");
356    format!("\"{}\"", escaped)
357}
358
359/// Escape a string for use as a DOT graph ID. Same rules as `dot_escape`.
360fn dot_escape_id(s: &str) -> String {
361    dot_escape(s)
362}
363
364/// Escape characters that are special inside DOT HTML-like labels.
365fn html_escape(s: &str) -> String {
366    s.replace('&', "&amp;")
367        .replace('<', "&lt;")
368        .replace('>', "&gt;")
369        .replace('"', "&quot;")
370}
371
372/// Strip common crate-path prefixes and leave just the type name.
373fn clean_type_name(type_name: &str) -> String {
374    // First strip known prefixes
375    let mut s = type_name.to_string();
376    for prefix in &[
377        "wfe_core::primitives::",
378        "wfe_core::",
379        "crate::workflows::primitives::",
380        "crate::workflows::steps::",
381        "crate::workflows::up::steps::",
382        "crate::workflows::down::steps::",
383        "crate::",
384    ] {
385        s = s.replace(prefix, "");
386    }
387    // If there are still '::' segments, keep only the last one (the type name itself)
388    if let Some(pos) = s.rfind("::") {
389        s.split_off(pos + 2)
390    } else {
391        s
392    }
393}
394
395/// Return a Graphviz fill colour for a step based on its type.
396fn step_fill_color(step: &WorkflowStep) -> &'static str {
397    if step.saga {
398        return "#ede9fe"; // light purple
399    }
400
401    let t = &step.step_type;
402    if t.contains("SequenceStep") {
403        "#dbeafe" // light blue   – parallel container
404    } else if t.contains("IfStep") {
405        "#d1fae5" // light green  – conditional
406    } else if t.contains("WhileStep") {
407        "#fef3c7" // light yellow – loop
408    } else if t.contains("ForEachStep") {
409        "#ffedd5" // light orange – iteration
410    } else if t.contains("SagaContainerStep") {
411        "#ede9fe" // light purple – saga
412    } else if t.contains("DecideStep") {
413        "#ccfbf1" // light teal   – multi-way branch
414    } else if t.contains("WaitForStep") {
415        "#fce7f3" // light pink   – event wait
416    } else if t.contains("DelayStep") || t.contains("ScheduleStep") {
417        "#f3e8ff" // light violet – time-based
418    } else if t.contains("EndStep") {
419        "#e5e7eb" // medium gray  – terminal
420    } else if t.contains("SubWorkflowStep") {
421        "#e0f2fe" // light sky    – sub-workflow
422    } else {
423        "#f3f4f6" // light gray   – default leaf step
424    }
425}
426
427/// Return a Graphviz shape for a step.
428fn step_shape(step: &WorkflowStep) -> &'static str {
429    let t = &step.step_type;
430    if t.contains("EndStep") {
431        "oval"
432    } else if t.contains("DecideStep") {
433        "diamond"
434    } else if t.contains("SequenceStep")
435        || t.contains("IfStep")
436        || t.contains("WhileStep")
437        || t.contains("ForEachStep")
438        || t.contains("SagaContainerStep")
439    {
440        "box3d"
441    } else {
442        "box"
443    }
444}
445
446/// Truncate a string to `max_len` characters, appending "…" if truncated.
447fn truncate(s: &str, max_len: usize) -> String {
448    if s.len() <= max_len {
449        s.to_string()
450    } else {
451        format!("{}…", &s[..max_len.saturating_sub(1)])
452    }
453}
454
455/// Routing outcome from a step.
456#[derive(Debug, Clone, Serialize, Deserialize)]
457pub struct StepOutcome {
458    /// Next step.
459    pub next_step: usize,
460    /// Label.
461    pub label: Option<String>,
462    /// Value.
463    pub value: Option<serde_json::Value>,
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use pretty_assertions::assert_eq;
470
471    #[test]
472    fn definition_defaults() {
473        let def = WorkflowDefinition::new("test-workflow", 1);
474        assert_eq!(def.id, "test-workflow");
475        assert_eq!(def.version, 1);
476        assert!(def.steps.is_empty());
477        assert_eq!(def.default_error_behavior, ErrorBehavior::default());
478        assert!(def.default_error_retry_interval.is_none());
479    }
480
481    #[test]
482    fn step_defaults() {
483        let step = WorkflowStep::new(0, "MyStep");
484        assert_eq!(step.id, 0);
485        assert_eq!(step.step_type, "MyStep");
486        assert!(step.children.is_empty());
487        assert!(step.outcomes.is_empty());
488        assert!(step.error_behavior.is_none());
489        assert!(step.compensation_step_id.is_none());
490    }
491
492    #[test]
493    fn definition_serde_round_trip() {
494        let mut def = WorkflowDefinition::new("wf", 3);
495        let mut step = WorkflowStep::new(0, "StepA");
496        step.outcomes.push(StepOutcome {
497            next_step: 1,
498            label: Some("next".into()),
499            value: None,
500        });
501        def.steps.push(step);
502        def.steps.push(WorkflowStep::new(1, "StepB"));
503
504        let json = serde_json::to_string(&def).unwrap();
505        let deserialized: WorkflowDefinition = serde_json::from_str(&json).unwrap();
506        assert_eq!(def.id, deserialized.id);
507        assert_eq!(def.steps.len(), deserialized.steps.len());
508        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
509    }
510
511    // ------------------------------------------------------------------
512    // DOT output tests
513    // ------------------------------------------------------------------
514
515    #[test]
516    fn dot_empty_workflow() {
517        let def = WorkflowDefinition::new("empty", 1);
518        let dot = def.to_dot();
519        assert!(dot.starts_with("digraph \"empty\" {"));
520        assert!(dot.contains("label=\"empty v1\""));
521        assert!(dot.ends_with("}\n"));
522    }
523
524    #[test]
525    fn dot_simple_chain() {
526        let mut def = WorkflowDefinition::new("chain", 1);
527        let mut step0 = WorkflowStep::new(0, "StepA");
528        step0.outcomes.push(StepOutcome {
529            next_step: 1,
530            label: None,
531            value: None,
532        });
533        def.steps.push(step0);
534        def.steps.push(WorkflowStep::new(1, "StepB"));
535
536        let dot = def.to_dot();
537        assert!(dot.contains("N0 ["));
538        assert!(dot.contains("N1 ["));
539        assert!(dot.contains("N0 -> N1"));
540    }
541
542    #[test]
543    fn dot_parallel_branches() {
544        let mut def = WorkflowDefinition::new("parallel", 1);
545        // Step 0: leaf
546        let mut step0 = WorkflowStep::new(0, "Start");
547        step0.outcomes.push(StepOutcome {
548            next_step: 1,
549            label: None,
550            value: None,
551        });
552        def.steps.push(step0);
553
554        // Step 1: SequenceStep (parallel container)
555        let mut seq = WorkflowStep::new(1, "wfe_core::primitives::sequence::SequenceStep");
556        seq.children = vec![2, 3];
557        seq.outcomes.push(StepOutcome {
558            next_step: 4,
559            label: None,
560            value: None,
561        });
562        def.steps.push(seq);
563
564        // Steps 2 and 3: branch bodies
565        let mut step2 = WorkflowStep::new(2, "BranchA");
566        step2.outcomes.push(StepOutcome {
567            next_step: 4,
568            label: None,
569            value: None,
570        });
571        def.steps.push(step2);
572
573        let mut step3 = WorkflowStep::new(3, "BranchB");
574        step3.outcomes.push(StepOutcome {
575            next_step: 4,
576            label: None,
577            value: None,
578        });
579        def.steps.push(step3);
580
581        // Step 4: end
582        def.steps.push(WorkflowStep::new(4, "End"));
583
584        let dot = def.to_dot();
585        // Container should be box3d and light blue
586        assert!(dot.contains("shape=box3d"));
587        assert!(dot.contains("fillcolor=\"#dbeafe\""));
588        // Child edges should be dotted
589        assert!(dot.contains("N1 -> N2 [style=dotted"));
590        assert!(dot.contains("N1 -> N3 [style=dotted"));
591        // Sequential flow
592        assert!(dot.contains("N0 -> N1"));
593        assert!(dot.contains("N1 -> N4"));
594    }
595
596    #[test]
597    fn dot_with_names() {
598        let mut def = WorkflowDefinition::new("named", 1);
599        let mut step = WorkflowStep::new(0, "sunbeam_sdk::workflows::up::steps::EnsureCilium");
600        step.name = Some("ensure-cilium".into());
601        def.steps.push(step);
602
603        let dot = def.to_dot();
604        // Name should appear in the label
605        assert!(dot.contains("ensure-cilium"));
606        // Type should be cleaned to just the basename
607        assert!(dot.contains("EnsureCilium"));
608        // Should NOT contain the full path
609        assert!(!dot.contains("sunbeam_sdk::workflows::up::steps::"));
610    }
611
612    #[test]
613    fn dot_compensation_edge() {
614        let mut def = WorkflowDefinition::new("saga", 1);
615        let mut step0 = WorkflowStep::new(0, "DoWork");
616        step0.compensation_step_id = Some(1);
617        def.steps.push(step0);
618        def.steps.push(WorkflowStep::new(1, "UndoWork"));
619
620        let dot = def.to_dot();
621        assert!(dot.contains("N0 -> N1 [style=dashed, color=\"#dc2626\", label=\"compensate\""));
622    }
623
624    #[test]
625    fn dot_condition_badge() {
626        let mut def = WorkflowDefinition::new("conditional", 1);
627        let mut step = WorkflowStep::new(0, "MaybeRun");
628        step.when = Some(StepCondition::Comparison(
629            crate::models::FieldComparison {
630                field: ".skip".to_string(),
631                operator: crate::models::ComparisonOp::Equals,
632                value: Some(serde_json::json!(false)),
633            },
634        ));
635        def.steps.push(step);
636
637        let dot = def.to_dot();
638        // Should have green border for condition
639        assert!(dot.contains("color=\"#059669\""));
640        assert!(dot.contains("[when]"));
641    }
642
643    #[test]
644    fn dot_error_badge() {
645        let mut def = WorkflowDefinition::new("err", 1);
646        let mut step = WorkflowStep::new(0, "RiskyStep");
647        step.error_behavior = Some(ErrorBehavior::Terminate);
648        def.steps.push(step);
649
650        let dot = def.to_dot();
651        // Should have thicker border
652        assert!(dot.contains("penwidth=1.5"));
653        // Should show the error behaviour
654        assert!(dot.contains("Terminate"));
655    }
656
657    #[test]
658    fn clean_type_name_strips_prefixes() {
659        assert_eq!(clean_type_name("wfe_core::primitives::sequence::SequenceStep"), "SequenceStep");
660        assert_eq!(clean_type_name("crate::workflows::steps::EnsureCilium"), "EnsureCilium");
661        assert_eq!(clean_type_name("MyCustomStep"), "MyCustomStep");
662        assert_eq!(clean_type_name("a::b::c::DeepStep"), "DeepStep");
663    }
664
665    #[test]
666    fn html_escape_special_chars() {
667        assert_eq!(html_escape("foo & bar"), "foo &amp; bar");
668        assert_eq!(html_escape("<tag>"), "&lt;tag&gt;");
669        assert_eq!(html_escape("\"quoted\""), "&quot;quoted&quot;");
670    }
671}