Skip to main content

harn_vm/orchestration/
workflow_bundle.rs

1//! Portable workflow bundle contract and deterministic local receipts.
2
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::fs;
5use std::path::Path;
6
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
11
12pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 1;
13pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct WorkflowBundle {
18    pub schema_version: u32,
19    pub id: String,
20    pub name: Option<String>,
21    pub version: String,
22    pub triggers: Vec<WorkflowBundleTrigger>,
23    pub workflow: WorkflowGraph,
24    pub prompt_capsules: BTreeMap<String, PromptCapsule>,
25    pub policy: WorkflowBundlePolicy,
26    pub connectors: Vec<ConnectorRequirement>,
27    pub environment: EnvironmentRequirements,
28    pub receipts: WorkflowBundleReplayMetadata,
29    pub metadata: BTreeMap<String, serde_json::Value>,
30}
31
32#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(default)]
34pub struct WorkflowBundleTrigger {
35    pub id: String,
36    pub kind: String,
37    pub provider: Option<String>,
38    pub events: Vec<String>,
39    pub schedule: Option<String>,
40    pub delay: Option<String>,
41    pub webhook_path: Option<String>,
42    pub mcp_tool: Option<String>,
43    pub resume_key: Option<String>,
44    pub node_id: Option<String>,
45    pub metadata: BTreeMap<String, String>,
46}
47
48#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
49#[serde(default)]
50pub struct PromptCapsule {
51    pub id: String,
52    pub node_id: String,
53    pub trigger_id: Option<String>,
54    pub prompt: String,
55    pub system: Option<String>,
56    pub context: BTreeMap<String, serde_json::Value>,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
60#[serde(default)]
61pub struct WorkflowBundlePolicy {
62    pub autonomy_tier: String,
63    pub tool_policy: BTreeMap<String, serde_json::Value>,
64    pub approval_required: Vec<String>,
65    pub retry: RetryPolicySpec,
66    pub catchup: CatchupPolicySpec,
67}
68
69impl Default for WorkflowBundlePolicy {
70    fn default() -> Self {
71        Self {
72            autonomy_tier: "act_with_approval".to_string(),
73            tool_policy: BTreeMap::new(),
74            approval_required: Vec::new(),
75            retry: RetryPolicySpec::default(),
76            catchup: CatchupPolicySpec::default(),
77        }
78    }
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
82#[serde(default)]
83pub struct RetryPolicySpec {
84    pub max_attempts: u32,
85    pub backoff: String,
86}
87
88impl Default for RetryPolicySpec {
89    fn default() -> Self {
90        Self {
91            max_attempts: 1,
92            backoff: "none".to_string(),
93        }
94    }
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
98#[serde(default)]
99pub struct CatchupPolicySpec {
100    pub mode: String,
101    pub max_events: Option<u32>,
102}
103
104impl Default for CatchupPolicySpec {
105    fn default() -> Self {
106        Self {
107            mode: "latest".to_string(),
108            max_events: Some(1),
109        }
110    }
111}
112
113#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
114#[serde(default)]
115pub struct ConnectorRequirement {
116    pub id: String,
117    pub provider_id: String,
118    pub scopes: Vec<String>,
119    pub setup_required: bool,
120    pub status_required: bool,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct EnvironmentRequirements {
126    pub repo_setup_profile: Option<String>,
127    pub worktree_policy: String,
128    pub command_gates: Vec<String>,
129}
130
131impl Default for EnvironmentRequirements {
132    fn default() -> Self {
133        Self {
134            repo_setup_profile: None,
135            worktree_policy: "host_managed".to_string(),
136            command_gates: Vec::new(),
137        }
138    }
139}
140
141#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
142#[serde(default)]
143pub struct WorkflowBundleReplayMetadata {
144    pub run_id: Option<String>,
145    pub event_ids: Vec<String>,
146    pub workflow_version: Option<usize>,
147    pub graph_digest: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct WorkflowBundleDiagnostic {
153    pub severity: String,
154    pub path: String,
155    pub message: String,
156    pub node_id: Option<String>,
157}
158
159#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
160#[serde(default)]
161pub struct WorkflowBundleValidationReport {
162    pub valid: bool,
163    pub bundle_id: String,
164    pub workflow_id: String,
165    pub graph_digest: String,
166    pub errors: Vec<WorkflowBundleDiagnostic>,
167    pub warnings: Vec<WorkflowBundleDiagnostic>,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
171pub struct WorkflowBundlePreview {
172    pub schema_version: u32,
173    pub bundle_id: String,
174    pub bundle_version: String,
175    pub workflow_id: String,
176    pub workflow_version: usize,
177    pub graph_digest: String,
178    pub validation: WorkflowBundleValidationReport,
179    pub graph: WorkflowBundleGraphExport,
180    pub mermaid: String,
181    pub triggers: Vec<WorkflowBundleTrigger>,
182    pub connectors: Vec<ConnectorRequirement>,
183    pub environment: EnvironmentRequirements,
184    pub nodes: Vec<WorkflowBundlePreviewNode>,
185    pub edges: Vec<WorkflowEdge>,
186}
187
188#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
189pub struct WorkflowBundlePreviewNode {
190    pub id: String,
191    pub kind: String,
192    pub label: Option<String>,
193    pub prompt_capsule: Option<String>,
194    pub trigger_ids: Vec<String>,
195    pub outgoing: Vec<String>,
196}
197
198#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
199pub struct WorkflowBundleGraphExport {
200    pub schema_version: u32,
201    pub graph_id: String,
202    pub graph_digest: String,
203    pub nodes: Vec<WorkflowBundleGraphNode>,
204    pub edges: Vec<WorkflowBundleGraphEdge>,
205    pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
206    pub editable_fields: Vec<WorkflowBundleEditableField>,
207    pub mermaid: String,
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
211pub struct WorkflowBundleGraphNode {
212    pub id: String,
213    pub node_type: String,
214    pub label: String,
215    pub workflow_node_id: Option<String>,
216    pub trigger_id: Option<String>,
217    pub connector_id: Option<String>,
218    pub editable_fields: Vec<WorkflowBundleEditableField>,
219    pub metadata: BTreeMap<String, serde_json::Value>,
220}
221
222#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub struct WorkflowBundleGraphEdge {
224    pub from: String,
225    pub to: String,
226    pub label: Option<String>,
227    pub branch: Option<String>,
228}
229
230#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
231pub struct WorkflowBundleGraphDiagnostic {
232    pub severity: String,
233    pub path: String,
234    pub message: String,
235    pub node_id: Option<String>,
236    pub graph_node_id: Option<String>,
237}
238
239#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
240pub struct WorkflowBundleEditableField {
241    pub id: String,
242    pub label: String,
243    pub json_pointer: String,
244    pub value_type: String,
245    pub required: bool,
246    pub enum_values: Vec<String>,
247}
248
249#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
250#[serde(default)]
251pub struct WorkflowBundleRunRequest {
252    pub trigger_id: Option<String>,
253    pub event_id: Option<String>,
254}
255
256#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
257pub struct WorkflowBundleRunReceipt {
258    pub schema_version: u32,
259    pub receipt_type: String,
260    pub bundle_id: String,
261    pub bundle_version: String,
262    pub workflow_id: String,
263    pub workflow_version: usize,
264    pub graph_digest: String,
265    pub run_id: String,
266    pub trigger_id: Option<String>,
267    pub event_ids: Vec<String>,
268    pub status: String,
269    pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
270    pub policy: WorkflowBundlePolicy,
271    pub connectors: Vec<ConnectorRequirement>,
272    pub environment: EnvironmentRequirements,
273}
274
275#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
276pub struct WorkflowBundleRunNodeReceipt {
277    pub node_id: String,
278    pub kind: String,
279    pub prompt_capsule: Option<String>,
280    pub status: String,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq)]
284pub struct WorkflowBundleError {
285    pub message: String,
286}
287
288impl std::fmt::Display for WorkflowBundleError {
289    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290        self.message.fmt(f)
291    }
292}
293
294impl std::error::Error for WorkflowBundleError {}
295
296impl From<std::io::Error> for WorkflowBundleError {
297    fn from(error: std::io::Error) -> Self {
298        Self {
299            message: error.to_string(),
300        }
301    }
302}
303
304impl From<serde_json::Error> for WorkflowBundleError {
305    fn from(error: serde_json::Error) -> Self {
306        Self {
307            message: error.to_string(),
308        }
309    }
310}
311
312pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
313    let bytes = fs::read(path)?;
314    serde_json::from_slice(&bytes).map_err(Into::into)
315}
316
317pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
318    let mut canonical = canonical_workflow_graph(graph);
319    canonical.audit_log.clear();
320    let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
321    let digest = Sha256::digest(bytes);
322    let hex = digest
323        .iter()
324        .map(|byte| format!("{byte:02x}"))
325        .collect::<String>();
326    format!("sha256:{hex}")
327}
328
329pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
330    let canonical = canonical_workflow_graph(&bundle.workflow);
331    let mut report = WorkflowBundleValidationReport {
332        valid: true,
333        bundle_id: bundle.id.clone(),
334        workflow_id: canonical.id.clone(),
335        graph_digest: workflow_graph_digest(&canonical),
336        errors: Vec::new(),
337        warnings: Vec::new(),
338    };
339
340    validate_bundle_identity(bundle, &canonical, &mut report);
341    validate_triggers(bundle, &canonical, &mut report);
342    validate_prompt_capsules(bundle, &canonical, &mut report);
343    validate_policy(bundle, &mut report);
344    validate_connectors(bundle, &mut report);
345    validate_environment(bundle, &mut report);
346
347    let graph_report = validate_workflow(&canonical, None);
348    for error in graph_report.errors {
349        let node_id = workflow_diagnostic_node_id(&error, &canonical);
350        push_error(&mut report, "workflow", error, node_id);
351    }
352    for warning in graph_report.warnings {
353        let node_id = workflow_diagnostic_node_id(&warning, &canonical);
354        push_warning(&mut report, "workflow", warning, node_id);
355    }
356
357    if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
358        if expected != report.graph_digest {
359            let actual = report.graph_digest.clone();
360            push_error(
361                &mut report,
362                "receipts.graph_digest",
363                format!("graph digest mismatch: expected {expected}, computed {actual}"),
364                None,
365            );
366        }
367    }
368    if let Some(expected_version) = bundle.receipts.workflow_version {
369        if expected_version != canonical.version {
370            push_error(
371                &mut report,
372                "receipts.workflow_version",
373                format!(
374                    "workflow version mismatch: expected {expected_version}, computed {}",
375                    canonical.version
376                ),
377                None,
378            );
379        }
380    }
381
382    report.valid = report.errors.is_empty();
383    report
384}
385
386pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
387    let canonical = canonical_workflow_graph(&bundle.workflow);
388    let validation = validate_workflow_bundle(bundle);
389    let graph = export_workflow_bundle_graph(bundle, &validation);
390    let mermaid = graph.mermaid.clone();
391    let triggers_by_node = triggers_by_node(bundle);
392    let capsules_by_node = capsules_by_node(bundle);
393    let mut nodes = Vec::new();
394
395    for (node_id, node) in &canonical.nodes {
396        let mut outgoing = canonical
397            .edges
398            .iter()
399            .filter(|edge| edge.from == *node_id)
400            .map(|edge| edge.to.clone())
401            .collect::<Vec<_>>();
402        outgoing.sort();
403        outgoing.dedup();
404        nodes.push(WorkflowBundlePreviewNode {
405            id: node_id.clone(),
406            kind: node.kind.clone(),
407            label: node.task_label.clone(),
408            prompt_capsule: capsules_by_node.get(node_id).cloned(),
409            trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
410            outgoing,
411        });
412    }
413
414    WorkflowBundlePreview {
415        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
416        bundle_id: bundle.id.clone(),
417        bundle_version: bundle.version.clone(),
418        workflow_id: canonical.id.clone(),
419        workflow_version: canonical.version,
420        graph_digest: validation.graph_digest.clone(),
421        validation,
422        graph,
423        mermaid,
424        triggers: bundle.triggers.clone(),
425        connectors: bundle.connectors.clone(),
426        environment: bundle.environment.clone(),
427        nodes,
428        edges: sorted_edges(&canonical),
429    }
430}
431
432pub fn export_workflow_bundle_graph(
433    bundle: &WorkflowBundle,
434    validation: &WorkflowBundleValidationReport,
435) -> WorkflowBundleGraphExport {
436    let canonical = canonical_workflow_graph(&bundle.workflow);
437    let mut nodes = Vec::new();
438    let mut edges = Vec::new();
439    let mut editable_fields = Vec::new();
440    let capsules_by_node = capsules_by_node(bundle);
441    let catchup_enabled = bundle.policy.catchup.mode != "none";
442    let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
443
444    for (index, connector) in bundle.connectors.iter().enumerate() {
445        let node_fields = connector_editable_fields(index, connector);
446        editable_fields.extend(node_fields.clone());
447        nodes.push(WorkflowBundleGraphNode {
448            id: connector_graph_id(&connector.id),
449            node_type: "connector_call".to_string(),
450            label: connector_label(connector),
451            workflow_node_id: None,
452            trigger_id: None,
453            connector_id: Some(connector.id.clone()),
454            editable_fields: node_fields,
455            metadata: BTreeMap::from([
456                (
457                    "provider_id".to_string(),
458                    serde_json::json!(connector.provider_id),
459                ),
460                ("scopes".to_string(), serde_json::json!(connector.scopes)),
461            ]),
462        });
463    }
464
465    let catchup_fields = catchup_editable_fields();
466    let retry_fields = retry_editable_fields();
467    if catchup_enabled {
468        let node_fields = catchup_fields.clone();
469        editable_fields.extend(node_fields.clone());
470        nodes.push(WorkflowBundleGraphNode {
471            id: catchup_graph_id(),
472            node_type: "catchup".to_string(),
473            label: "Catch up".to_string(),
474            workflow_node_id: None,
475            trigger_id: None,
476            connector_id: None,
477            editable_fields: node_fields,
478            metadata: BTreeMap::from([(
479                "mode".to_string(),
480                serde_json::json!(bundle.policy.catchup.mode),
481            )]),
482        });
483    } else {
484        editable_fields.extend(catchup_fields);
485    }
486    if retry_can_dlq {
487        let node_fields = retry_fields.clone();
488        editable_fields.extend(node_fields.clone());
489        nodes.push(WorkflowBundleGraphNode {
490            id: dlq_graph_id(),
491            node_type: "dlq".to_string(),
492            label: "Dead letter queue".to_string(),
493            workflow_node_id: None,
494            trigger_id: None,
495            connector_id: None,
496            editable_fields: node_fields,
497            metadata: BTreeMap::from([(
498                "max_attempts".to_string(),
499                serde_json::json!(bundle.policy.retry.max_attempts),
500            )]),
501        });
502    } else {
503        editable_fields.extend(retry_fields);
504    }
505
506    for (index, trigger) in bundle.triggers.iter().enumerate() {
507        let node_fields = trigger_editable_fields(index, trigger);
508        editable_fields.extend(node_fields.clone());
509        nodes.push(WorkflowBundleGraphNode {
510            id: trigger_graph_id(&trigger.id),
511            node_type: "trigger".to_string(),
512            label: trigger_label(trigger),
513            workflow_node_id: trigger.node_id.clone(),
514            trigger_id: Some(trigger.id.clone()),
515            connector_id: None,
516            editable_fields: node_fields,
517            metadata: BTreeMap::from([
518                ("kind".to_string(), serde_json::json!(trigger.kind)),
519                ("provider".to_string(), serde_json::json!(trigger.provider)),
520                ("events".to_string(), serde_json::json!(trigger.events)),
521            ]),
522        });
523        if let Some(provider) = trigger.provider.as_deref() {
524            if let Some(connector) = bundle
525                .connectors
526                .iter()
527                .find(|connector| connector.provider_id == provider || connector.id == provider)
528            {
529                edges.push(WorkflowBundleGraphEdge {
530                    from: connector_graph_id(&connector.id),
531                    to: trigger_graph_id(&trigger.id),
532                    label: Some("binds".to_string()),
533                    branch: None,
534                });
535            }
536        }
537        let target = trigger
538            .node_id
539            .clone()
540            .unwrap_or_else(|| canonical.entry.clone());
541        if catchup_enabled {
542            edges.push(WorkflowBundleGraphEdge {
543                from: trigger_graph_id(&trigger.id),
544                to: catchup_graph_id(),
545                label: Some(bundle.policy.catchup.mode.clone()),
546                branch: Some("catchup".to_string()),
547            });
548            edges.push(WorkflowBundleGraphEdge {
549                from: catchup_graph_id(),
550                to: workflow_graph_id(&target),
551                label: Some("dispatch".to_string()),
552                branch: None,
553            });
554        } else {
555            edges.push(WorkflowBundleGraphEdge {
556                from: trigger_graph_id(&trigger.id),
557                to: workflow_graph_id(&target),
558                label: Some("dispatch".to_string()),
559                branch: None,
560            });
561        }
562    }
563
564    for (node_id, node) in &canonical.nodes {
565        let capsule_id = capsules_by_node.get(node_id);
566        let node_fields = workflow_node_editable_fields(node_id, capsule_id);
567        editable_fields.extend(node_fields.clone());
568        nodes.push(WorkflowBundleGraphNode {
569            id: workflow_graph_id(node_id),
570            node_type: workflow_node_type(&node.kind),
571            label: workflow_node_label(node_id, node),
572            workflow_node_id: Some(node_id.clone()),
573            trigger_id: None,
574            connector_id: None,
575            editable_fields: node_fields,
576            metadata: BTreeMap::from([
577                ("kind".to_string(), serde_json::json!(node.kind)),
578                ("task_label".to_string(), serde_json::json!(node.task_label)),
579                (
580                    "prompt_capsule".to_string(),
581                    serde_json::json!(capsule_id.cloned()),
582                ),
583            ]),
584        });
585    }
586
587    for edge in sorted_edges(&canonical) {
588        edges.push(WorkflowBundleGraphEdge {
589            from: workflow_graph_id(&edge.from),
590            to: workflow_graph_id(&edge.to),
591            label: edge.label.clone(),
592            branch: edge.branch.clone(),
593        });
594    }
595
596    let outgoing: BTreeSet<&str> = canonical
597        .edges
598        .iter()
599        .map(|edge| edge.from.as_str())
600        .collect();
601    for node_id in canonical.nodes.keys() {
602        if !outgoing.contains(node_id.as_str()) {
603            edges.push(WorkflowBundleGraphEdge {
604                from: workflow_graph_id(node_id),
605                to: terminal_completed_graph_id(),
606                label: Some("completed".to_string()),
607                branch: Some("completed".to_string()),
608            });
609        }
610        if retry_can_dlq {
611            edges.push(WorkflowBundleGraphEdge {
612                from: workflow_graph_id(node_id),
613                to: dlq_graph_id(),
614                label: Some("retry exhausted".to_string()),
615                branch: Some("failed".to_string()),
616            });
617        }
618    }
619
620    nodes.push(WorkflowBundleGraphNode {
621        id: terminal_completed_graph_id(),
622        node_type: "terminal".to_string(),
623        label: "Completed".to_string(),
624        workflow_node_id: None,
625        trigger_id: None,
626        connector_id: None,
627        editable_fields: Vec::new(),
628        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
629    });
630    nodes.push(WorkflowBundleGraphNode {
631        id: terminal_failed_graph_id(),
632        node_type: "terminal".to_string(),
633        label: "Failed".to_string(),
634        workflow_node_id: None,
635        trigger_id: None,
636        connector_id: None,
637        editable_fields: Vec::new(),
638        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
639    });
640    if retry_can_dlq {
641        edges.push(WorkflowBundleGraphEdge {
642            from: dlq_graph_id(),
643            to: terminal_failed_graph_id(),
644            label: Some("failed".to_string()),
645            branch: Some("failed".to_string()),
646        });
647    }
648
649    nodes.sort_by(|left, right| left.id.cmp(&right.id));
650    edges.sort_by(|left, right| {
651        (&left.from, &left.to, &left.branch, &left.label).cmp(&(
652            &right.from,
653            &right.to,
654            &right.branch,
655            &right.label,
656        ))
657    });
658    editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
659
660    let diagnostics = validation
661        .errors
662        .iter()
663        .chain(validation.warnings.iter())
664        .map(|diagnostic| WorkflowBundleGraphDiagnostic {
665            severity: diagnostic.severity.clone(),
666            path: diagnostic.path.clone(),
667            message: diagnostic.message.clone(),
668            node_id: diagnostic.node_id.clone(),
669            graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
670        })
671        .collect::<Vec<_>>();
672    let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
673
674    WorkflowBundleGraphExport {
675        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
676        graph_id: canonical.id,
677        graph_digest: validation.graph_digest.clone(),
678        nodes,
679        edges,
680        diagnostics,
681        editable_fields,
682        mermaid,
683    }
684}
685
686pub fn run_workflow_bundle(
687    bundle: &WorkflowBundle,
688    request: WorkflowBundleRunRequest,
689) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
690    let validation = validate_workflow_bundle(bundle);
691    if !validation.valid {
692        return Err(validation);
693    }
694
695    let canonical = canonical_workflow_graph(&bundle.workflow);
696    let trigger_id = match request.trigger_id {
697        Some(trigger_id)
698            if !bundle
699                .triggers
700                .iter()
701                .any(|trigger| trigger.id == trigger_id) =>
702        {
703            let mut report = validation;
704            push_error(
705                &mut report,
706                "trigger_id",
707                format!("unknown trigger id: {trigger_id}"),
708                None,
709            );
710            report.valid = false;
711            return Err(report);
712        }
713        Some(trigger_id) => Some(trigger_id),
714        None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
715    };
716    let mut event_ids = bundle.receipts.event_ids.clone();
717    if let Some(event_id) = request.event_id {
718        if !event_ids.contains(&event_id) {
719            event_ids.push(event_id);
720        }
721    }
722    let run_id = bundle
723        .receipts
724        .run_id
725        .clone()
726        .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
727    let capsules_by_node = capsules_by_node(bundle);
728    let executed_nodes = execution_order(&canonical)
729        .into_iter()
730        .map(|node_id| {
731            let node = canonical
732                .nodes
733                .get(&node_id)
734                .expect("execution order only contains known nodes");
735            WorkflowBundleRunNodeReceipt {
736                node_id: node_id.clone(),
737                kind: node.kind.clone(),
738                prompt_capsule: capsules_by_node.get(&node_id).cloned(),
739                status: "completed".to_string(),
740            }
741        })
742        .collect();
743
744    Ok(WorkflowBundleRunReceipt {
745        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
746        receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
747        bundle_id: bundle.id.clone(),
748        bundle_version: bundle.version.clone(),
749        workflow_id: canonical.id,
750        workflow_version: canonical.version,
751        graph_digest: validation.graph_digest,
752        run_id,
753        trigger_id,
754        event_ids,
755        status: "completed".to_string(),
756        executed_nodes,
757        policy: bundle.policy.clone(),
758        connectors: bundle.connectors.clone(),
759        environment: bundle.environment.clone(),
760    })
761}
762
763fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
764    let mut canonical = graph.clone();
765    if canonical.type_name.is_empty() {
766        canonical.type_name = "workflow_graph".to_string();
767    }
768    if canonical.version == 0 {
769        canonical.version = 1;
770    }
771    if canonical.entry.is_empty() {
772        canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
773    }
774    for (node_id, node) in &mut canonical.nodes {
775        if node.id.is_none() {
776            node.id = Some(node_id.clone());
777        }
778        if node.kind.is_empty() {
779            node.kind = "stage".to_string();
780        }
781        if node.retry_policy.max_attempts == 0 {
782            node.retry_policy.max_attempts = 1;
783        }
784    }
785    canonical.edges = sorted_edges(&canonical);
786    canonical
787}
788
789fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
790    let mut edges = graph.edges.clone();
791    edges.sort_by(|left, right| {
792        (
793            &left.from,
794            &left.to,
795            left.branch.as_deref(),
796            left.label.as_deref(),
797        )
798            .cmp(&(
799                &right.from,
800                &right.to,
801                right.branch.as_deref(),
802                right.label.as_deref(),
803            ))
804    });
805    edges
806}
807
808fn validate_bundle_identity(
809    bundle: &WorkflowBundle,
810    graph: &WorkflowGraph,
811    report: &mut WorkflowBundleValidationReport,
812) {
813    if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
814        push_error(
815            report,
816            "schema_version",
817            format!(
818                "unsupported schema_version {}; expected {}",
819                bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
820            ),
821            None,
822        );
823    }
824    if bundle.id.trim().is_empty() {
825        push_error(report, "id", "bundle id is required", None);
826    }
827    if bundle.version.trim().is_empty() {
828        push_error(report, "version", "bundle version is required", None);
829    }
830    if graph.id.trim().is_empty() {
831        push_error(
832            report,
833            "workflow.id",
834            "workflow id is required for portable bundles",
835            None,
836        );
837    }
838    if graph.nodes.is_empty() {
839        push_error(
840            report,
841            "workflow.nodes",
842            "workflow must contain nodes",
843            None,
844        );
845    }
846    for (node_id, node) in &graph.nodes {
847        if node_id.trim().is_empty() {
848            push_error(report, "workflow.nodes", "node id is required", None);
849        }
850        if node.id.as_deref().is_some_and(|id| id != node_id) {
851            push_error(
852                report,
853                format!("workflow.nodes.{node_id}.id"),
854                "node id field must match its map key",
855                Some(node_id.clone()),
856            );
857        }
858    }
859}
860
861fn validate_triggers(
862    bundle: &WorkflowBundle,
863    graph: &WorkflowGraph,
864    report: &mut WorkflowBundleValidationReport,
865) {
866    if bundle.triggers.is_empty() {
867        push_warning(report, "triggers", "bundle declares no triggers", None);
868    }
869    let mut ids = BTreeSet::new();
870    for (index, trigger) in bundle.triggers.iter().enumerate() {
871        let path = format!("triggers[{index}]");
872        if trigger.id.trim().is_empty() {
873            push_error(report, format!("{path}.id"), "trigger id is required", None);
874        } else if !ids.insert(trigger.id.clone()) {
875            push_error(
876                report,
877                format!("{path}.id"),
878                format!("duplicate trigger id: {}", trigger.id),
879                None,
880            );
881        }
882        match trigger.kind.as_str() {
883            "github" => {
884                if trigger.provider.as_deref() != Some("github") {
885                    push_error(
886                        report,
887                        format!("{path}.provider"),
888                        "github triggers require provider=\"github\"",
889                        None,
890                    );
891                }
892                if trigger.events.is_empty() {
893                    push_error(
894                        report,
895                        format!("{path}.events"),
896                        "github triggers require at least one event",
897                        None,
898                    );
899                }
900            }
901            "cron" if trigger.schedule.is_none() => push_error(
902                report,
903                format!("{path}.schedule"),
904                "cron triggers require schedule",
905                None,
906            ),
907            "delay" if trigger.delay.is_none() => push_error(
908                report,
909                format!("{path}.delay"),
910                "delay triggers require delay",
911                None,
912            ),
913            "webhook" if trigger.webhook_path.is_none() => push_error(
914                report,
915                format!("{path}.webhook_path"),
916                "webhook triggers require webhook_path",
917                None,
918            ),
919            "mcp" if trigger.mcp_tool.is_none() => push_error(
920                report,
921                format!("{path}.mcp_tool"),
922                "mcp triggers require mcp_tool",
923                None,
924            ),
925            "manual" => {}
926            "" => push_error(
927                report,
928                format!("{path}.kind"),
929                "trigger kind is required",
930                None,
931            ),
932            other
933                if !matches!(
934                    other,
935                    "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
936                ) =>
937            {
938                push_error(
939                    report,
940                    format!("{path}.kind"),
941                    format!("unsupported trigger kind: {other}"),
942                    None,
943                );
944            }
945            _ => {}
946        }
947        if let Some(node_id) = trigger.node_id.as_deref() {
948            if !graph.nodes.contains_key(node_id) {
949                push_error(
950                    report,
951                    format!("{path}.node_id"),
952                    format!("trigger references unknown node: {node_id}"),
953                    Some(node_id.to_string()),
954                );
955            }
956        }
957    }
958}
959
960fn validate_prompt_capsules(
961    bundle: &WorkflowBundle,
962    graph: &WorkflowGraph,
963    report: &mut WorkflowBundleValidationReport,
964) {
965    let trigger_ids: BTreeSet<&str> = bundle
966        .triggers
967        .iter()
968        .map(|trigger| trigger.id.as_str())
969        .collect();
970    let mut node_refs = BTreeSet::new();
971    for (key, capsule) in &bundle.prompt_capsules {
972        let path = format!("prompt_capsules.{key}");
973        if capsule.id.trim().is_empty() {
974            push_error(
975                report,
976                format!("{path}.id"),
977                "prompt capsule id is required",
978                None,
979            );
980        } else if capsule.id != *key {
981            push_error(
982                report,
983                format!("{path}.id"),
984                "prompt capsule id must match its map key",
985                None,
986            );
987        }
988        if capsule.prompt.trim().is_empty() {
989            push_error(
990                report,
991                format!("{path}.prompt"),
992                "prompt capsule prompt is required",
993                Some(capsule.node_id.clone()),
994            );
995        }
996        if !graph.nodes.contains_key(&capsule.node_id) {
997            push_error(
998                report,
999                format!("{path}.node_id"),
1000                format!(
1001                    "prompt capsule references unknown node: {}",
1002                    capsule.node_id
1003                ),
1004                Some(capsule.node_id.clone()),
1005            );
1006        }
1007        if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1008            push_error(
1009                report,
1010                format!("{path}.node_id"),
1011                format!("multiple prompt capsules target node {}", capsule.node_id),
1012                Some(capsule.node_id.clone()),
1013            );
1014        }
1015        if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1016            if !trigger_ids.contains(trigger_id) {
1017                push_error(
1018                    report,
1019                    format!("{path}.trigger_id"),
1020                    format!("prompt capsule references unknown trigger: {trigger_id}"),
1021                    Some(capsule.node_id.clone()),
1022                );
1023            }
1024        }
1025    }
1026}
1027
1028fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1029    if !matches!(
1030        bundle.policy.autonomy_tier.as_str(),
1031        "shadow" | "suggest" | "act_with_approval" | "act_auto"
1032    ) {
1033        push_error(
1034            report,
1035            "policy.autonomy_tier",
1036            "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1037            None,
1038        );
1039    }
1040    if bundle.policy.retry.max_attempts == 0 {
1041        push_error(
1042            report,
1043            "policy.retry.max_attempts",
1044            "retry.max_attempts must be at least 1",
1045            None,
1046        );
1047    }
1048    if !matches!(
1049        bundle.policy.catchup.mode.as_str(),
1050        "none" | "latest" | "all"
1051    ) {
1052        push_error(
1053            report,
1054            "policy.catchup.mode",
1055            "catchup.mode must be none, latest, or all",
1056            None,
1057        );
1058    }
1059}
1060
1061fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1062    let mut ids = BTreeSet::new();
1063    let provider_ids: BTreeSet<&str> = bundle
1064        .connectors
1065        .iter()
1066        .map(|connector| connector.provider_id.as_str())
1067        .collect();
1068    for (index, connector) in bundle.connectors.iter().enumerate() {
1069        let path = format!("connectors[{index}]");
1070        if connector.id.trim().is_empty() {
1071            push_error(
1072                report,
1073                format!("{path}.id"),
1074                "connector id is required",
1075                None,
1076            );
1077        } else if !ids.insert(connector.id.clone()) {
1078            push_error(
1079                report,
1080                format!("{path}.id"),
1081                format!("duplicate connector id: {}", connector.id),
1082                None,
1083            );
1084        }
1085        if connector.provider_id.trim().is_empty() {
1086            push_error(
1087                report,
1088                format!("{path}.provider_id"),
1089                "connector provider_id is required",
1090                None,
1091            );
1092        }
1093    }
1094    for trigger in &bundle.triggers {
1095        if let Some(provider) = trigger.provider.as_deref() {
1096            if !provider_ids.contains(provider) {
1097                push_warning(
1098                    report,
1099                    "connectors",
1100                    format!(
1101                        "trigger {} references provider {provider} with no connector requirement",
1102                        trigger.id
1103                    ),
1104                    trigger.node_id.clone(),
1105                );
1106            }
1107        }
1108    }
1109}
1110
1111fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1112    if !matches!(
1113        bundle.environment.worktree_policy.as_str(),
1114        "reuse_current" | "new_worktree" | "host_managed"
1115    ) {
1116        push_error(
1117            report,
1118            "environment.worktree_policy",
1119            "worktree_policy must be reuse_current, new_worktree, or host_managed",
1120            None,
1121        );
1122    }
1123}
1124
1125fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1126    for prefix in [
1127        "node is unreachable: ",
1128        "edge.from references unknown node: ",
1129        "edge.to references unknown node: ",
1130        "entry node does not exist: ",
1131    ] {
1132        if let Some(node_id) = message.strip_prefix(prefix) {
1133            return Some(node_id.to_string());
1134        }
1135    }
1136    if let Some(rest) = message.strip_prefix("node ") {
1137        if let Some((node_id, _)) = rest.split_once(':') {
1138            return Some(node_id.to_string());
1139        }
1140    }
1141    graph
1142        .nodes
1143        .keys()
1144        .find(|node_id| message.contains(&format!("node {node_id}:")))
1145        .cloned()
1146}
1147
1148fn workflow_graph_id(node_id: &str) -> String {
1149    format!("node/{node_id}")
1150}
1151
1152fn trigger_graph_id(trigger_id: &str) -> String {
1153    format!("trigger/{trigger_id}")
1154}
1155
1156fn connector_graph_id(connector_id: &str) -> String {
1157    format!("connector/{connector_id}")
1158}
1159
1160fn catchup_graph_id() -> String {
1161    "policy/catchup".to_string()
1162}
1163
1164fn dlq_graph_id() -> String {
1165    "policy/dlq".to_string()
1166}
1167
1168fn terminal_completed_graph_id() -> String {
1169    "terminal/completed".to_string()
1170}
1171
1172fn terminal_failed_graph_id() -> String {
1173    "terminal/failed".to_string()
1174}
1175
1176fn workflow_node_type(kind: &str) -> String {
1177    match kind {
1178        "action" => "action",
1179        "stage" | "agent" => "agent",
1180        "subagent" | "worker" => "subagent",
1181        "wait" | "waitpoint" | "delay" => "wait",
1182        "approval" | "hitl" => "approval",
1183        "connector" | "connector_call" => "connector_call",
1184        "notification" | "notify" => "notification",
1185        "terminal" | "success" | "failure" => "terminal",
1186        other if other.trim().is_empty() => "agent",
1187        other => other,
1188    }
1189    .to_string()
1190}
1191
1192fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
1193    node.task_label
1194        .clone()
1195        .or_else(|| node.prompt.clone())
1196        .map(|label| label.trim().to_string())
1197        .filter(|label| !label.is_empty())
1198        .unwrap_or_else(|| node_id.to_string())
1199}
1200
1201fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
1202    if !trigger.events.is_empty() {
1203        format!("{}: {}", trigger.kind, trigger.events.join(", "))
1204    } else if let Some(schedule) = trigger.schedule.as_deref() {
1205        format!("cron: {schedule}")
1206    } else if let Some(delay) = trigger.delay.as_deref() {
1207        format!("delay: {delay}")
1208    } else {
1209        trigger.id.clone()
1210    }
1211}
1212
1213fn connector_label(connector: &ConnectorRequirement) -> String {
1214    if connector.provider_id.is_empty() || connector.provider_id == connector.id {
1215        connector.id.clone()
1216    } else {
1217        format!("{} ({})", connector.id, connector.provider_id)
1218    }
1219}
1220
1221fn editable_field(
1222    id: impl Into<String>,
1223    label: impl Into<String>,
1224    json_pointer: impl Into<String>,
1225    value_type: impl Into<String>,
1226    required: bool,
1227    enum_values: &[&str],
1228) -> WorkflowBundleEditableField {
1229    WorkflowBundleEditableField {
1230        id: id.into(),
1231        label: label.into(),
1232        json_pointer: json_pointer.into(),
1233        value_type: value_type.into(),
1234        required,
1235        enum_values: enum_values
1236            .iter()
1237            .map(|value| (*value).to_string())
1238            .collect(),
1239    }
1240}
1241
1242fn json_pointer_segment(value: &str) -> String {
1243    value.replace('~', "~0").replace('/', "~1")
1244}
1245
1246fn trigger_editable_fields(
1247    index: usize,
1248    trigger: &WorkflowBundleTrigger,
1249) -> Vec<WorkflowBundleEditableField> {
1250    let base = format!("/triggers/{index}");
1251    let mut fields = vec![
1252        editable_field(
1253            format!("trigger.{}.kind", trigger.id),
1254            "Trigger kind",
1255            format!("{base}/kind"),
1256            "enum",
1257            true,
1258            &["github", "cron", "delay", "manual", "webhook", "mcp"],
1259        ),
1260        editable_field(
1261            format!("trigger.{}.node_id", trigger.id),
1262            "Target node",
1263            format!("{base}/node_id"),
1264            "string",
1265            false,
1266            &[],
1267        ),
1268    ];
1269    if trigger.provider.is_some() || trigger.kind == "github" {
1270        fields.push(editable_field(
1271            format!("trigger.{}.provider", trigger.id),
1272            "Provider",
1273            format!("{base}/provider"),
1274            "string",
1275            trigger.kind == "github",
1276            &[],
1277        ));
1278    }
1279    for (field, label, value_type) in [
1280        ("events", "Events", "list"),
1281        ("schedule", "Schedule", "string"),
1282        ("delay", "Delay", "string"),
1283        ("webhook_path", "Webhook path", "string"),
1284        ("mcp_tool", "MCP tool", "string"),
1285        ("resume_key", "Resume key", "string"),
1286        ("metadata", "Metadata", "object"),
1287    ] {
1288        fields.push(editable_field(
1289            format!("trigger.{}.{}", trigger.id, field),
1290            label,
1291            format!("{base}/{field}"),
1292            value_type,
1293            false,
1294            &[],
1295        ));
1296    }
1297    fields
1298}
1299
1300fn workflow_node_editable_fields(
1301    node_id: &str,
1302    capsule_id: Option<&String>,
1303) -> Vec<WorkflowBundleEditableField> {
1304    let escaped_node = json_pointer_segment(node_id);
1305    let mut fields = vec![
1306        editable_field(
1307            format!("workflow.{node_id}.task_label"),
1308            "Task label",
1309            format!("/workflow/nodes/{escaped_node}/task_label"),
1310            "string",
1311            false,
1312            &[],
1313        ),
1314        editable_field(
1315            format!("workflow.{node_id}.prompt"),
1316            "Prompt",
1317            format!("/workflow/nodes/{escaped_node}/prompt"),
1318            "string",
1319            false,
1320            &[],
1321        ),
1322        editable_field(
1323            format!("workflow.{node_id}.system"),
1324            "System prompt",
1325            format!("/workflow/nodes/{escaped_node}/system"),
1326            "string",
1327            false,
1328            &[],
1329        ),
1330        editable_field(
1331            format!("workflow.{node_id}.model_policy"),
1332            "Model policy",
1333            format!("/workflow/nodes/{escaped_node}/model_policy"),
1334            "object",
1335            false,
1336            &[],
1337        ),
1338        editable_field(
1339            format!("workflow.{node_id}.tools"),
1340            "Tool policy",
1341            format!("/workflow/nodes/{escaped_node}/tools"),
1342            "any",
1343            false,
1344            &[],
1345        ),
1346        editable_field(
1347            format!("workflow.{node_id}.capability_policy"),
1348            "Capability policy",
1349            format!("/workflow/nodes/{escaped_node}/capability_policy"),
1350            "object",
1351            false,
1352            &[],
1353        ),
1354        editable_field(
1355            format!("workflow.{node_id}.approval_policy"),
1356            "Approval policy",
1357            format!("/workflow/nodes/{escaped_node}/approval_policy"),
1358            "object",
1359            false,
1360            &[],
1361        ),
1362        editable_field(
1363            format!("workflow.{node_id}.retry_policy"),
1364            "Retry policy",
1365            format!("/workflow/nodes/{escaped_node}/retry_policy"),
1366            "object",
1367            false,
1368            &[],
1369        ),
1370    ];
1371    if let Some(capsule_id) = capsule_id {
1372        let escaped_capsule = json_pointer_segment(capsule_id);
1373        fields.extend([
1374            editable_field(
1375                format!("prompt_capsule.{capsule_id}.prompt"),
1376                "Prompt capsule",
1377                format!("/prompt_capsules/{escaped_capsule}/prompt"),
1378                "string",
1379                true,
1380                &[],
1381            ),
1382            editable_field(
1383                format!("prompt_capsule.{capsule_id}.system"),
1384                "Prompt capsule system",
1385                format!("/prompt_capsules/{escaped_capsule}/system"),
1386                "string",
1387                false,
1388                &[],
1389            ),
1390            editable_field(
1391                format!("prompt_capsule.{capsule_id}.context"),
1392                "Prompt capsule context",
1393                format!("/prompt_capsules/{escaped_capsule}/context"),
1394                "object",
1395                false,
1396                &[],
1397            ),
1398            editable_field(
1399                format!("prompt_capsule.{capsule_id}.trigger_id"),
1400                "Prompt capsule trigger",
1401                format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
1402                "string",
1403                false,
1404                &[],
1405            ),
1406        ]);
1407    }
1408    fields
1409}
1410
1411fn connector_editable_fields(
1412    index: usize,
1413    connector: &ConnectorRequirement,
1414) -> Vec<WorkflowBundleEditableField> {
1415    let base = format!("/connectors/{index}");
1416    [
1417        ("id", "Connector id", "string", true),
1418        ("provider_id", "Provider id", "string", true),
1419        ("scopes", "Scopes", "list", false),
1420        ("setup_required", "Setup required", "bool", false),
1421        ("status_required", "Status required", "bool", false),
1422    ]
1423    .into_iter()
1424    .map(|(field, label, value_type, required)| {
1425        editable_field(
1426            format!("connector.{}.{}", connector.id, field),
1427            label,
1428            format!("{base}/{field}"),
1429            value_type,
1430            required,
1431            &[],
1432        )
1433    })
1434    .collect()
1435}
1436
1437fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
1438    vec![
1439        editable_field(
1440            "policy.retry.max_attempts",
1441            "Retry attempts",
1442            "/policy/retry/max_attempts",
1443            "integer",
1444            true,
1445            &[],
1446        ),
1447        editable_field(
1448            "policy.retry.backoff",
1449            "Retry backoff",
1450            "/policy/retry/backoff",
1451            "string",
1452            true,
1453            &[],
1454        ),
1455    ]
1456}
1457
1458fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
1459    vec![
1460        editable_field(
1461            "policy.catchup.mode",
1462            "Catchup mode",
1463            "/policy/catchup/mode",
1464            "enum",
1465            true,
1466            &["none", "latest", "all"],
1467        ),
1468        editable_field(
1469            "policy.catchup.max_events",
1470            "Catchup max events",
1471            "/policy/catchup/max_events",
1472            "integer",
1473            false,
1474            &[],
1475        ),
1476    ]
1477}
1478
1479fn render_workflow_bundle_mermaid(
1480    nodes: &[WorkflowBundleGraphNode],
1481    edges: &[WorkflowBundleGraphEdge],
1482) -> String {
1483    let mut lines = vec!["flowchart TD".to_string()];
1484    for node in nodes {
1485        lines.push(format!(
1486            "  {}[\"{}\"]",
1487            mermaid_id(&node.id),
1488            mermaid_label(&format!("{}: {}", node.node_type, node.label))
1489        ));
1490    }
1491    for edge in edges {
1492        let label = edge
1493            .label
1494            .as_deref()
1495            .or(edge.branch.as_deref())
1496            .map(mermaid_label);
1497        match label {
1498            Some(label) if !label.is_empty() => lines.push(format!(
1499                "  {} -->|{}| {}",
1500                mermaid_id(&edge.from),
1501                label,
1502                mermaid_id(&edge.to)
1503            )),
1504            _ => lines.push(format!(
1505                "  {} --> {}",
1506                mermaid_id(&edge.from),
1507                mermaid_id(&edge.to)
1508            )),
1509        }
1510    }
1511    lines.join("\n")
1512}
1513
1514fn mermaid_id(value: &str) -> String {
1515    let digest = Sha256::digest(value.as_bytes());
1516    let suffix = digest
1517        .iter()
1518        .take(4)
1519        .map(|byte| format!("{byte:02x}"))
1520        .collect::<String>();
1521    let mut out = format!("n_{suffix}_");
1522    for ch in value.chars() {
1523        if ch.is_ascii_alphanumeric() {
1524            out.push(ch);
1525        } else {
1526            out.push('_');
1527        }
1528    }
1529    out
1530}
1531
1532fn mermaid_label(value: &str) -> String {
1533    value
1534        .replace('\\', "\\\\")
1535        .replace('"', "\\\"")
1536        .replace('\n', " ")
1537}
1538
1539fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
1540    let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
1541    for trigger in &bundle.triggers {
1542        if let Some(node_id) = trigger.node_id.as_ref() {
1543            by_node
1544                .entry(node_id.clone())
1545                .or_default()
1546                .push(trigger.id.clone());
1547        }
1548    }
1549    by_node
1550}
1551
1552fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
1553    bundle
1554        .prompt_capsules
1555        .iter()
1556        .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
1557        .collect()
1558}
1559
1560fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
1561    let outgoing =
1562        graph
1563            .edges
1564            .iter()
1565            .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
1566                acc.entry(edge.from.clone())
1567                    .or_default()
1568                    .push(edge.to.clone());
1569                acc
1570            });
1571    let mut seen = BTreeSet::new();
1572    let mut queue = VecDeque::from([graph.entry.clone()]);
1573    let mut order = Vec::new();
1574    while let Some(node_id) = queue.pop_front() {
1575        if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
1576            continue;
1577        }
1578        order.push(node_id.clone());
1579        if let Some(next) = outgoing.get(&node_id) {
1580            let mut next = next.clone();
1581            next.sort();
1582            for child in next {
1583                queue.push_back(child);
1584            }
1585        }
1586    }
1587    order
1588}
1589
1590fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
1591    let suffix = graph_digest
1592        .strip_prefix("sha256:")
1593        .unwrap_or(graph_digest)
1594        .chars()
1595        .take(12)
1596        .collect::<String>();
1597    format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
1598}
1599
1600fn sanitize_id(value: &str) -> String {
1601    value
1602        .chars()
1603        .map(|ch| {
1604            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
1605                ch
1606            } else {
1607                '_'
1608            }
1609        })
1610        .collect()
1611}
1612
1613fn push_error(
1614    report: &mut WorkflowBundleValidationReport,
1615    path: impl Into<String>,
1616    message: impl Into<String>,
1617    node_id: Option<String>,
1618) {
1619    report.errors.push(WorkflowBundleDiagnostic {
1620        severity: "error".to_string(),
1621        path: path.into(),
1622        message: message.into(),
1623        node_id,
1624    });
1625}
1626
1627fn push_warning(
1628    report: &mut WorkflowBundleValidationReport,
1629    path: impl Into<String>,
1630    message: impl Into<String>,
1631    node_id: Option<String>,
1632) {
1633    report.warnings.push(WorkflowBundleDiagnostic {
1634        severity: "warning".to_string(),
1635        path: path.into(),
1636        message: message.into(),
1637        node_id,
1638    });
1639}
1640
1641#[cfg(test)]
1642#[path = "workflow_bundle_tests.rs"]
1643mod workflow_bundle_tests;