Skip to main content

harn_vm/orchestration/
workflow_bundle.rs

1//! Portable workflow bundle contract and deterministic local receipts.
2
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::fs;
5use std::path::Path;
6
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
11
12pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 1;
13pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct WorkflowBundle {
18    pub schema_version: u32,
19    pub id: String,
20    pub name: Option<String>,
21    pub version: String,
22    pub triggers: Vec<WorkflowBundleTrigger>,
23    pub workflow: WorkflowGraph,
24    pub prompt_capsules: BTreeMap<String, PromptCapsule>,
25    pub policy: WorkflowBundlePolicy,
26    pub connectors: Vec<ConnectorRequirement>,
27    pub environment: EnvironmentRequirements,
28    pub receipts: WorkflowBundleReplayMetadata,
29    pub metadata: BTreeMap<String, serde_json::Value>,
30}
31
32#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(default)]
34pub struct WorkflowBundleTrigger {
35    pub id: String,
36    pub kind: String,
37    pub provider: Option<String>,
38    pub events: Vec<String>,
39    pub schedule: Option<String>,
40    pub delay: Option<String>,
41    pub webhook_path: Option<String>,
42    pub mcp_tool: Option<String>,
43    pub resume_key: Option<String>,
44    pub node_id: Option<String>,
45    pub metadata: BTreeMap<String, String>,
46}
47
48#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
49#[serde(default)]
50pub struct PromptCapsule {
51    pub id: String,
52    pub node_id: String,
53    pub trigger_id: Option<String>,
54    pub prompt: String,
55    pub system: Option<String>,
56    pub context: BTreeMap<String, serde_json::Value>,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
60#[serde(default)]
61pub struct WorkflowBundlePolicy {
62    pub autonomy_tier: String,
63    pub tool_policy: BTreeMap<String, serde_json::Value>,
64    pub approval_required: Vec<String>,
65    pub retry: RetryPolicySpec,
66    pub catchup: CatchupPolicySpec,
67}
68
69impl Default for WorkflowBundlePolicy {
70    fn default() -> Self {
71        Self {
72            autonomy_tier: "act_with_approval".to_string(),
73            tool_policy: BTreeMap::new(),
74            approval_required: Vec::new(),
75            retry: RetryPolicySpec::default(),
76            catchup: CatchupPolicySpec::default(),
77        }
78    }
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
82#[serde(default)]
83pub struct RetryPolicySpec {
84    pub max_attempts: u32,
85    pub backoff: String,
86}
87
88impl Default for RetryPolicySpec {
89    fn default() -> Self {
90        Self {
91            max_attempts: 1,
92            backoff: "none".to_string(),
93        }
94    }
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
98#[serde(default)]
99pub struct CatchupPolicySpec {
100    pub mode: String,
101    pub max_events: Option<u32>,
102}
103
104impl Default for CatchupPolicySpec {
105    fn default() -> Self {
106        Self {
107            mode: "latest".to_string(),
108            max_events: Some(1),
109        }
110    }
111}
112
113#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
114#[serde(default)]
115pub struct ConnectorRequirement {
116    pub id: String,
117    pub provider_id: String,
118    pub scopes: Vec<String>,
119    pub setup_required: bool,
120    pub status_required: bool,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct EnvironmentRequirements {
126    pub repo_setup_profile: Option<String>,
127    pub worktree_policy: String,
128    pub command_gates: Vec<String>,
129}
130
131impl Default for EnvironmentRequirements {
132    fn default() -> Self {
133        Self {
134            repo_setup_profile: None,
135            worktree_policy: "host_managed".to_string(),
136            command_gates: Vec::new(),
137        }
138    }
139}
140
141#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
142#[serde(default)]
143pub struct WorkflowBundleReplayMetadata {
144    pub run_id: Option<String>,
145    pub event_ids: Vec<String>,
146    pub workflow_version: Option<usize>,
147    pub graph_digest: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct WorkflowBundleDiagnostic {
153    pub severity: String,
154    pub path: String,
155    pub message: String,
156    pub node_id: Option<String>,
157}
158
159#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
160#[serde(default)]
161pub struct WorkflowBundleValidationReport {
162    pub valid: bool,
163    pub bundle_id: String,
164    pub workflow_id: String,
165    pub graph_digest: String,
166    pub errors: Vec<WorkflowBundleDiagnostic>,
167    pub warnings: Vec<WorkflowBundleDiagnostic>,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
171pub struct WorkflowBundlePreview {
172    pub schema_version: u32,
173    pub bundle_id: String,
174    pub bundle_version: String,
175    pub workflow_id: String,
176    pub workflow_version: usize,
177    pub graph_digest: String,
178    pub validation: WorkflowBundleValidationReport,
179    pub triggers: Vec<WorkflowBundleTrigger>,
180    pub connectors: Vec<ConnectorRequirement>,
181    pub environment: EnvironmentRequirements,
182    pub nodes: Vec<WorkflowBundlePreviewNode>,
183    pub edges: Vec<WorkflowEdge>,
184}
185
186#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
187pub struct WorkflowBundlePreviewNode {
188    pub id: String,
189    pub kind: String,
190    pub label: Option<String>,
191    pub prompt_capsule: Option<String>,
192    pub trigger_ids: Vec<String>,
193    pub outgoing: Vec<String>,
194}
195
196#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
197#[serde(default)]
198pub struct WorkflowBundleRunRequest {
199    pub trigger_id: Option<String>,
200    pub event_id: Option<String>,
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
204pub struct WorkflowBundleRunReceipt {
205    pub schema_version: u32,
206    pub receipt_type: String,
207    pub bundle_id: String,
208    pub bundle_version: String,
209    pub workflow_id: String,
210    pub workflow_version: usize,
211    pub graph_digest: String,
212    pub run_id: String,
213    pub trigger_id: Option<String>,
214    pub event_ids: Vec<String>,
215    pub status: String,
216    pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
217    pub policy: WorkflowBundlePolicy,
218    pub connectors: Vec<ConnectorRequirement>,
219    pub environment: EnvironmentRequirements,
220}
221
222#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub struct WorkflowBundleRunNodeReceipt {
224    pub node_id: String,
225    pub kind: String,
226    pub prompt_capsule: Option<String>,
227    pub status: String,
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
231pub struct WorkflowBundleError {
232    pub message: String,
233}
234
235impl std::fmt::Display for WorkflowBundleError {
236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        self.message.fmt(f)
238    }
239}
240
241impl std::error::Error for WorkflowBundleError {}
242
243impl From<std::io::Error> for WorkflowBundleError {
244    fn from(error: std::io::Error) -> Self {
245        Self {
246            message: error.to_string(),
247        }
248    }
249}
250
251impl From<serde_json::Error> for WorkflowBundleError {
252    fn from(error: serde_json::Error) -> Self {
253        Self {
254            message: error.to_string(),
255        }
256    }
257}
258
259pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
260    let bytes = fs::read(path)?;
261    serde_json::from_slice(&bytes).map_err(Into::into)
262}
263
264pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
265    let mut canonical = canonical_workflow_graph(graph);
266    canonical.audit_log.clear();
267    let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
268    let digest = Sha256::digest(bytes);
269    let hex = digest
270        .iter()
271        .map(|byte| format!("{byte:02x}"))
272        .collect::<String>();
273    format!("sha256:{hex}")
274}
275
276pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
277    let canonical = canonical_workflow_graph(&bundle.workflow);
278    let mut report = WorkflowBundleValidationReport {
279        valid: true,
280        bundle_id: bundle.id.clone(),
281        workflow_id: canonical.id.clone(),
282        graph_digest: workflow_graph_digest(&canonical),
283        errors: Vec::new(),
284        warnings: Vec::new(),
285    };
286
287    validate_bundle_identity(bundle, &canonical, &mut report);
288    validate_triggers(bundle, &canonical, &mut report);
289    validate_prompt_capsules(bundle, &canonical, &mut report);
290    validate_policy(bundle, &mut report);
291    validate_connectors(bundle, &mut report);
292    validate_environment(bundle, &mut report);
293
294    let graph_report = validate_workflow(&canonical, None);
295    for error in graph_report.errors {
296        push_error(&mut report, "workflow", error, None);
297    }
298    for warning in graph_report.warnings {
299        push_warning(&mut report, "workflow", warning, None);
300    }
301
302    if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
303        if expected != report.graph_digest {
304            let actual = report.graph_digest.clone();
305            push_error(
306                &mut report,
307                "receipts.graph_digest",
308                format!("graph digest mismatch: expected {expected}, computed {actual}"),
309                None,
310            );
311        }
312    }
313    if let Some(expected_version) = bundle.receipts.workflow_version {
314        if expected_version != canonical.version {
315            push_error(
316                &mut report,
317                "receipts.workflow_version",
318                format!(
319                    "workflow version mismatch: expected {expected_version}, computed {}",
320                    canonical.version
321                ),
322                None,
323            );
324        }
325    }
326
327    report.valid = report.errors.is_empty();
328    report
329}
330
331pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
332    let canonical = canonical_workflow_graph(&bundle.workflow);
333    let validation = validate_workflow_bundle(bundle);
334    let triggers_by_node = triggers_by_node(bundle);
335    let capsules_by_node = capsules_by_node(bundle);
336    let mut nodes = Vec::new();
337
338    for (node_id, node) in &canonical.nodes {
339        let mut outgoing = canonical
340            .edges
341            .iter()
342            .filter(|edge| edge.from == *node_id)
343            .map(|edge| edge.to.clone())
344            .collect::<Vec<_>>();
345        outgoing.sort();
346        outgoing.dedup();
347        nodes.push(WorkflowBundlePreviewNode {
348            id: node_id.clone(),
349            kind: node.kind.clone(),
350            label: node.task_label.clone(),
351            prompt_capsule: capsules_by_node.get(node_id).cloned(),
352            trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
353            outgoing,
354        });
355    }
356
357    WorkflowBundlePreview {
358        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
359        bundle_id: bundle.id.clone(),
360        bundle_version: bundle.version.clone(),
361        workflow_id: canonical.id.clone(),
362        workflow_version: canonical.version,
363        graph_digest: validation.graph_digest.clone(),
364        validation,
365        triggers: bundle.triggers.clone(),
366        connectors: bundle.connectors.clone(),
367        environment: bundle.environment.clone(),
368        nodes,
369        edges: sorted_edges(&canonical),
370    }
371}
372
373pub fn run_workflow_bundle(
374    bundle: &WorkflowBundle,
375    request: WorkflowBundleRunRequest,
376) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
377    let validation = validate_workflow_bundle(bundle);
378    if !validation.valid {
379        return Err(validation);
380    }
381
382    let canonical = canonical_workflow_graph(&bundle.workflow);
383    let trigger_id = match request.trigger_id {
384        Some(trigger_id)
385            if !bundle
386                .triggers
387                .iter()
388                .any(|trigger| trigger.id == trigger_id) =>
389        {
390            let mut report = validation;
391            push_error(
392                &mut report,
393                "trigger_id",
394                format!("unknown trigger id: {trigger_id}"),
395                None,
396            );
397            report.valid = false;
398            return Err(report);
399        }
400        Some(trigger_id) => Some(trigger_id),
401        None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
402    };
403    let mut event_ids = bundle.receipts.event_ids.clone();
404    if let Some(event_id) = request.event_id {
405        if !event_ids.contains(&event_id) {
406            event_ids.push(event_id);
407        }
408    }
409    let run_id = bundle
410        .receipts
411        .run_id
412        .clone()
413        .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
414    let capsules_by_node = capsules_by_node(bundle);
415    let executed_nodes = execution_order(&canonical)
416        .into_iter()
417        .map(|node_id| {
418            let node = canonical
419                .nodes
420                .get(&node_id)
421                .expect("execution order only contains known nodes");
422            WorkflowBundleRunNodeReceipt {
423                node_id: node_id.clone(),
424                kind: node.kind.clone(),
425                prompt_capsule: capsules_by_node.get(&node_id).cloned(),
426                status: "completed".to_string(),
427            }
428        })
429        .collect();
430
431    Ok(WorkflowBundleRunReceipt {
432        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
433        receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
434        bundle_id: bundle.id.clone(),
435        bundle_version: bundle.version.clone(),
436        workflow_id: canonical.id,
437        workflow_version: canonical.version,
438        graph_digest: validation.graph_digest,
439        run_id,
440        trigger_id,
441        event_ids,
442        status: "completed".to_string(),
443        executed_nodes,
444        policy: bundle.policy.clone(),
445        connectors: bundle.connectors.clone(),
446        environment: bundle.environment.clone(),
447    })
448}
449
450fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
451    let mut canonical = graph.clone();
452    if canonical.type_name.is_empty() {
453        canonical.type_name = "workflow_graph".to_string();
454    }
455    if canonical.version == 0 {
456        canonical.version = 1;
457    }
458    if canonical.entry.is_empty() {
459        canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
460    }
461    for (node_id, node) in &mut canonical.nodes {
462        if node.id.is_none() {
463            node.id = Some(node_id.clone());
464        }
465        if node.kind.is_empty() {
466            node.kind = "stage".to_string();
467        }
468        if node.retry_policy.max_attempts == 0 {
469            node.retry_policy.max_attempts = 1;
470        }
471    }
472    canonical.edges = sorted_edges(&canonical);
473    canonical
474}
475
476fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
477    let mut edges = graph.edges.clone();
478    edges.sort_by(|left, right| {
479        (
480            &left.from,
481            &left.to,
482            left.branch.as_deref(),
483            left.label.as_deref(),
484        )
485            .cmp(&(
486                &right.from,
487                &right.to,
488                right.branch.as_deref(),
489                right.label.as_deref(),
490            ))
491    });
492    edges
493}
494
495fn validate_bundle_identity(
496    bundle: &WorkflowBundle,
497    graph: &WorkflowGraph,
498    report: &mut WorkflowBundleValidationReport,
499) {
500    if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
501        push_error(
502            report,
503            "schema_version",
504            format!(
505                "unsupported schema_version {}; expected {}",
506                bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
507            ),
508            None,
509        );
510    }
511    if bundle.id.trim().is_empty() {
512        push_error(report, "id", "bundle id is required", None);
513    }
514    if bundle.version.trim().is_empty() {
515        push_error(report, "version", "bundle version is required", None);
516    }
517    if graph.id.trim().is_empty() {
518        push_error(
519            report,
520            "workflow.id",
521            "workflow id is required for portable bundles",
522            None,
523        );
524    }
525    if graph.nodes.is_empty() {
526        push_error(
527            report,
528            "workflow.nodes",
529            "workflow must contain nodes",
530            None,
531        );
532    }
533    for (node_id, node) in &graph.nodes {
534        if node_id.trim().is_empty() {
535            push_error(report, "workflow.nodes", "node id is required", None);
536        }
537        if node.id.as_deref().is_some_and(|id| id != node_id) {
538            push_error(
539                report,
540                format!("workflow.nodes.{node_id}.id"),
541                "node id field must match its map key",
542                Some(node_id.clone()),
543            );
544        }
545    }
546}
547
548fn validate_triggers(
549    bundle: &WorkflowBundle,
550    graph: &WorkflowGraph,
551    report: &mut WorkflowBundleValidationReport,
552) {
553    if bundle.triggers.is_empty() {
554        push_warning(report, "triggers", "bundle declares no triggers", None);
555    }
556    let mut ids = BTreeSet::new();
557    for (index, trigger) in bundle.triggers.iter().enumerate() {
558        let path = format!("triggers[{index}]");
559        if trigger.id.trim().is_empty() {
560            push_error(report, format!("{path}.id"), "trigger id is required", None);
561        } else if !ids.insert(trigger.id.clone()) {
562            push_error(
563                report,
564                format!("{path}.id"),
565                format!("duplicate trigger id: {}", trigger.id),
566                None,
567            );
568        }
569        match trigger.kind.as_str() {
570            "github" => {
571                if trigger.provider.as_deref() != Some("github") {
572                    push_error(
573                        report,
574                        format!("{path}.provider"),
575                        "github triggers require provider=\"github\"",
576                        None,
577                    );
578                }
579                if trigger.events.is_empty() {
580                    push_error(
581                        report,
582                        format!("{path}.events"),
583                        "github triggers require at least one event",
584                        None,
585                    );
586                }
587            }
588            "cron" if trigger.schedule.is_none() => push_error(
589                report,
590                format!("{path}.schedule"),
591                "cron triggers require schedule",
592                None,
593            ),
594            "delay" if trigger.delay.is_none() => push_error(
595                report,
596                format!("{path}.delay"),
597                "delay triggers require delay",
598                None,
599            ),
600            "webhook" if trigger.webhook_path.is_none() => push_error(
601                report,
602                format!("{path}.webhook_path"),
603                "webhook triggers require webhook_path",
604                None,
605            ),
606            "mcp" if trigger.mcp_tool.is_none() => push_error(
607                report,
608                format!("{path}.mcp_tool"),
609                "mcp triggers require mcp_tool",
610                None,
611            ),
612            "manual" => {}
613            "" => push_error(
614                report,
615                format!("{path}.kind"),
616                "trigger kind is required",
617                None,
618            ),
619            other
620                if !matches!(
621                    other,
622                    "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
623                ) =>
624            {
625                push_error(
626                    report,
627                    format!("{path}.kind"),
628                    format!("unsupported trigger kind: {other}"),
629                    None,
630                );
631            }
632            _ => {}
633        }
634        if let Some(node_id) = trigger.node_id.as_deref() {
635            if !graph.nodes.contains_key(node_id) {
636                push_error(
637                    report,
638                    format!("{path}.node_id"),
639                    format!("trigger references unknown node: {node_id}"),
640                    Some(node_id.to_string()),
641                );
642            }
643        }
644    }
645}
646
647fn validate_prompt_capsules(
648    bundle: &WorkflowBundle,
649    graph: &WorkflowGraph,
650    report: &mut WorkflowBundleValidationReport,
651) {
652    let trigger_ids: BTreeSet<&str> = bundle
653        .triggers
654        .iter()
655        .map(|trigger| trigger.id.as_str())
656        .collect();
657    let mut node_refs = BTreeSet::new();
658    for (key, capsule) in &bundle.prompt_capsules {
659        let path = format!("prompt_capsules.{key}");
660        if capsule.id.trim().is_empty() {
661            push_error(
662                report,
663                format!("{path}.id"),
664                "prompt capsule id is required",
665                None,
666            );
667        } else if capsule.id != *key {
668            push_error(
669                report,
670                format!("{path}.id"),
671                "prompt capsule id must match its map key",
672                None,
673            );
674        }
675        if capsule.prompt.trim().is_empty() {
676            push_error(
677                report,
678                format!("{path}.prompt"),
679                "prompt capsule prompt is required",
680                Some(capsule.node_id.clone()),
681            );
682        }
683        if !graph.nodes.contains_key(&capsule.node_id) {
684            push_error(
685                report,
686                format!("{path}.node_id"),
687                format!(
688                    "prompt capsule references unknown node: {}",
689                    capsule.node_id
690                ),
691                Some(capsule.node_id.clone()),
692            );
693        }
694        if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
695            push_error(
696                report,
697                format!("{path}.node_id"),
698                format!("multiple prompt capsules target node {}", capsule.node_id),
699                Some(capsule.node_id.clone()),
700            );
701        }
702        if let Some(trigger_id) = capsule.trigger_id.as_deref() {
703            if !trigger_ids.contains(trigger_id) {
704                push_error(
705                    report,
706                    format!("{path}.trigger_id"),
707                    format!("prompt capsule references unknown trigger: {trigger_id}"),
708                    Some(capsule.node_id.clone()),
709                );
710            }
711        }
712    }
713}
714
715fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
716    if !matches!(
717        bundle.policy.autonomy_tier.as_str(),
718        "shadow" | "suggest" | "act_with_approval" | "act_auto"
719    ) {
720        push_error(
721            report,
722            "policy.autonomy_tier",
723            "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
724            None,
725        );
726    }
727    if bundle.policy.retry.max_attempts == 0 {
728        push_error(
729            report,
730            "policy.retry.max_attempts",
731            "retry.max_attempts must be at least 1",
732            None,
733        );
734    }
735    if !matches!(
736        bundle.policy.catchup.mode.as_str(),
737        "none" | "latest" | "all"
738    ) {
739        push_error(
740            report,
741            "policy.catchup.mode",
742            "catchup.mode must be none, latest, or all",
743            None,
744        );
745    }
746}
747
748fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
749    let mut ids = BTreeSet::new();
750    let provider_ids: BTreeSet<&str> = bundle
751        .connectors
752        .iter()
753        .map(|connector| connector.provider_id.as_str())
754        .collect();
755    for (index, connector) in bundle.connectors.iter().enumerate() {
756        let path = format!("connectors[{index}]");
757        if connector.id.trim().is_empty() {
758            push_error(
759                report,
760                format!("{path}.id"),
761                "connector id is required",
762                None,
763            );
764        } else if !ids.insert(connector.id.clone()) {
765            push_error(
766                report,
767                format!("{path}.id"),
768                format!("duplicate connector id: {}", connector.id),
769                None,
770            );
771        }
772        if connector.provider_id.trim().is_empty() {
773            push_error(
774                report,
775                format!("{path}.provider_id"),
776                "connector provider_id is required",
777                None,
778            );
779        }
780    }
781    for trigger in &bundle.triggers {
782        if let Some(provider) = trigger.provider.as_deref() {
783            if !provider_ids.contains(provider) {
784                push_warning(
785                    report,
786                    "connectors",
787                    format!(
788                        "trigger {} references provider {provider} with no connector requirement",
789                        trigger.id
790                    ),
791                    trigger.node_id.clone(),
792                );
793            }
794        }
795    }
796}
797
798fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
799    if !matches!(
800        bundle.environment.worktree_policy.as_str(),
801        "reuse_current" | "new_worktree" | "host_managed"
802    ) {
803        push_error(
804            report,
805            "environment.worktree_policy",
806            "worktree_policy must be reuse_current, new_worktree, or host_managed",
807            None,
808        );
809    }
810}
811
812fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
813    let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
814    for trigger in &bundle.triggers {
815        if let Some(node_id) = trigger.node_id.as_ref() {
816            by_node
817                .entry(node_id.clone())
818                .or_default()
819                .push(trigger.id.clone());
820        }
821    }
822    by_node
823}
824
825fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
826    bundle
827        .prompt_capsules
828        .iter()
829        .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
830        .collect()
831}
832
833fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
834    let outgoing =
835        graph
836            .edges
837            .iter()
838            .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
839                acc.entry(edge.from.clone())
840                    .or_default()
841                    .push(edge.to.clone());
842                acc
843            });
844    let mut seen = BTreeSet::new();
845    let mut queue = VecDeque::from([graph.entry.clone()]);
846    let mut order = Vec::new();
847    while let Some(node_id) = queue.pop_front() {
848        if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
849            continue;
850        }
851        order.push(node_id.clone());
852        if let Some(next) = outgoing.get(&node_id) {
853            let mut next = next.clone();
854            next.sort();
855            for child in next {
856                queue.push_back(child);
857            }
858        }
859    }
860    order
861}
862
863fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
864    let suffix = graph_digest
865        .strip_prefix("sha256:")
866        .unwrap_or(graph_digest)
867        .chars()
868        .take(12)
869        .collect::<String>();
870    format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
871}
872
873fn sanitize_id(value: &str) -> String {
874    value
875        .chars()
876        .map(|ch| {
877            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
878                ch
879            } else {
880                '_'
881            }
882        })
883        .collect()
884}
885
886fn push_error(
887    report: &mut WorkflowBundleValidationReport,
888    path: impl Into<String>,
889    message: impl Into<String>,
890    node_id: Option<String>,
891) {
892    report.errors.push(WorkflowBundleDiagnostic {
893        severity: "error".to_string(),
894        path: path.into(),
895        message: message.into(),
896        node_id,
897    });
898}
899
900fn push_warning(
901    report: &mut WorkflowBundleValidationReport,
902    path: impl Into<String>,
903    message: impl Into<String>,
904    node_id: Option<String>,
905) {
906    report.warnings.push(WorkflowBundleDiagnostic {
907        severity: "warning".to_string(),
908        path: path.into(),
909        message: message.into(),
910        node_id,
911    });
912}
913
914#[cfg(test)]
915#[path = "workflow_bundle_tests.rs"]
916mod workflow_bundle_tests;