Skip to main content

harn_vm/orchestration/workflow_bundle/
mod.rs

1//! Portable workflow bundle contract, `.harnpack` container helpers, and
2//! deterministic local receipts.
3
4use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22/// Maximum decompressed size of a `.harnpack` archive. Matches the
23/// VM-level decompression cap in `stdlib/compression.rs`. Keeps a
24/// malicious bundle from exhausting memory during ingest (workflow
25/// bundles ride this exact path when harn-cloud accepts a plan
26/// transfer).
27const MAX_HARNPACK_DECOMPRESSED_BYTES: u64 = 100 * 1024 * 1024;
28
29/// Decompress zstd-compressed harnpack bytes, refusing to produce more
30/// than [`MAX_HARNPACK_DECOMPRESSED_BYTES`] of output. Returns
31/// [`WorkflowBundleErrorKind::InvalidArchive`] on overflow so callers
32/// can surface a clean rejection instead of OOMing the host.
33fn decompress_harnpack_zstd(bytes: &[u8]) -> Result<Vec<u8>, WorkflowBundleError> {
34    let mut decoder = zstd::stream::Decoder::new(Cursor::new(bytes))?;
35    let mut output = Vec::new();
36    let mut limited = (&mut decoder).take(MAX_HARNPACK_DECOMPRESSED_BYTES.saturating_add(1));
37    limited.read_to_end(&mut output)?;
38    if output.len() as u64 > MAX_HARNPACK_DECOMPRESSED_BYTES {
39        return Err(WorkflowBundleError::new(
40            WorkflowBundleErrorKind::InvalidArchive,
41            format!(
42                "harnpack zstd payload decompressed past max size ({MAX_HARNPACK_DECOMPRESSED_BYTES} bytes); refusing to extract"
43            ),
44        ));
45    }
46    Ok(output)
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct WorkflowBundle {
52    pub schema_version: u32,
53    pub entrypoint: PathBuf,
54    pub transitive_modules: Vec<ModuleEntry>,
55    pub stdlib_version: String,
56    pub harn_version: String,
57    pub provider_catalog_hash: String,
58    pub tool_manifest: Vec<ToolEntry>,
59    pub sbom: SBOMDoc,
60    pub signature: Option<Ed25519Signature>,
61    pub parent_trust_record_id: Option<String>,
62    pub id: String,
63    pub name: Option<String>,
64    pub version: String,
65    pub triggers: Vec<WorkflowBundleTrigger>,
66    pub workflow: WorkflowGraph,
67    pub prompt_capsules: BTreeMap<String, PromptCapsule>,
68    pub policy: WorkflowBundlePolicy,
69    pub connectors: Vec<ConnectorRequirement>,
70    pub environment: EnvironmentRequirements,
71    pub receipts: WorkflowBundleReplayMetadata,
72    pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75impl Default for WorkflowBundle {
76    fn default() -> Self {
77        Self {
78            schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
79            entrypoint: PathBuf::new(),
80            transitive_modules: Vec::new(),
81            stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
82            harn_version: env!("CARGO_PKG_VERSION").to_string(),
83            provider_catalog_hash: String::new(),
84            tool_manifest: Vec::new(),
85            sbom: SBOMDoc::default(),
86            signature: None,
87            parent_trust_record_id: None,
88            id: String::new(),
89            name: None,
90            version: String::new(),
91            triggers: Vec::new(),
92            workflow: WorkflowGraph::default(),
93            prompt_capsules: BTreeMap::new(),
94            policy: WorkflowBundlePolicy::default(),
95            connectors: Vec::new(),
96            environment: EnvironmentRequirements::default(),
97            receipts: WorkflowBundleReplayMetadata::default(),
98            metadata: BTreeMap::new(),
99        }
100    }
101}
102
103#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(default)]
105pub struct ModuleEntry {
106    pub path: PathBuf,
107    pub source_hash_blake3: String,
108    pub harnbc_hash_blake3: String,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ToolEntry {
114    pub name: String,
115    pub provider: Option<String>,
116    pub annotations: Option<ToolAnnotations>,
117    pub schema_hash_blake3: Option<String>,
118    pub metadata: BTreeMap<String, String>,
119}
120
121#[allow(clippy::upper_case_acronyms)]
122#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
123#[serde(default)]
124pub struct SBOMDoc {
125    pub format: String,
126    pub version: String,
127    pub packages: Vec<SBOMPackage>,
128    pub relationships: Vec<SBOMRelationship>,
129}
130
131#[allow(clippy::upper_case_acronyms)]
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct SBOMPackage {
135    pub name: String,
136    pub version: Option<String>,
137    pub package_hash_blake3: Option<String>,
138    pub license: Option<String>,
139}
140
141#[allow(clippy::upper_case_acronyms)]
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct SBOMRelationship {
145    pub from: String,
146    pub to: String,
147    pub relationship_type: String,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct Ed25519Signature {
153    pub key_id: Option<String>,
154    pub public_key: String,
155    pub signature: String,
156    pub manifest_hash_blake3: String,
157    pub algorithm: String,
158}
159
160#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
161#[serde(default)]
162pub struct WorkflowBundleTrigger {
163    pub id: String,
164    pub kind: String,
165    pub provider: Option<String>,
166    pub events: Vec<String>,
167    pub schedule: Option<String>,
168    pub delay: Option<String>,
169    pub webhook_path: Option<String>,
170    pub mcp_tool: Option<String>,
171    pub resume_key: Option<String>,
172    pub node_id: Option<String>,
173    pub metadata: BTreeMap<String, String>,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct PromptCapsule {
179    pub id: String,
180    pub node_id: String,
181    pub trigger_id: Option<String>,
182    pub prompt: String,
183    pub system: Option<String>,
184    pub context: BTreeMap<String, serde_json::Value>,
185}
186
187#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
188#[serde(default)]
189pub struct WorkflowBundlePolicy {
190    pub autonomy_tier: String,
191    pub tool_policy: BTreeMap<String, serde_json::Value>,
192    pub approval_required: Vec<String>,
193    pub retry: RetryPolicySpec,
194    pub catchup: CatchupPolicySpec,
195}
196
197impl Default for WorkflowBundlePolicy {
198    fn default() -> Self {
199        Self {
200            autonomy_tier: "act_with_approval".to_string(),
201            tool_policy: BTreeMap::new(),
202            approval_required: Vec::new(),
203            retry: RetryPolicySpec::default(),
204            catchup: CatchupPolicySpec::default(),
205        }
206    }
207}
208
209#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(default)]
211pub struct RetryPolicySpec {
212    pub max_attempts: u32,
213    pub backoff: String,
214}
215
216impl Default for RetryPolicySpec {
217    fn default() -> Self {
218        Self {
219            max_attempts: 1,
220            backoff: "none".to_string(),
221        }
222    }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226#[serde(default)]
227pub struct CatchupPolicySpec {
228    pub mode: String,
229    pub max_events: Option<u32>,
230}
231
232impl Default for CatchupPolicySpec {
233    fn default() -> Self {
234        Self {
235            mode: "latest".to_string(),
236            max_events: Some(1),
237        }
238    }
239}
240
241#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
242#[serde(default)]
243pub struct ConnectorRequirement {
244    pub id: String,
245    pub provider_id: String,
246    pub scopes: Vec<String>,
247    pub setup_required: bool,
248    pub status_required: bool,
249}
250
251#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct EnvironmentRequirements {
254    pub repo_setup_profile: Option<String>,
255    pub worktree_policy: String,
256    pub command_gates: Vec<String>,
257}
258
259impl Default for EnvironmentRequirements {
260    fn default() -> Self {
261        Self {
262            repo_setup_profile: None,
263            worktree_policy: "host_managed".to_string(),
264            command_gates: Vec::new(),
265        }
266    }
267}
268
269#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
270#[serde(default)]
271pub struct WorkflowBundleReplayMetadata {
272    pub run_id: Option<String>,
273    pub event_ids: Vec<String>,
274    pub workflow_version: Option<usize>,
275    pub graph_digest: Option<String>,
276}
277
278#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
279#[serde(default)]
280pub struct WorkflowBundleDiagnostic {
281    pub severity: String,
282    pub path: String,
283    pub message: String,
284    pub node_id: Option<String>,
285}
286
287#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
288#[serde(default)]
289pub struct WorkflowBundleValidationReport {
290    pub valid: bool,
291    pub bundle_id: String,
292    pub workflow_id: String,
293    pub graph_digest: String,
294    pub errors: Vec<WorkflowBundleDiagnostic>,
295    pub warnings: Vec<WorkflowBundleDiagnostic>,
296}
297
298#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
299pub struct WorkflowBundlePreview {
300    pub schema_version: u32,
301    pub bundle_id: String,
302    pub bundle_version: String,
303    pub workflow_id: String,
304    pub workflow_version: usize,
305    pub graph_digest: String,
306    pub validation: WorkflowBundleValidationReport,
307    pub graph: WorkflowBundleGraphExport,
308    pub mermaid: String,
309    pub triggers: Vec<WorkflowBundleTrigger>,
310    pub connectors: Vec<ConnectorRequirement>,
311    pub environment: EnvironmentRequirements,
312    pub nodes: Vec<WorkflowBundlePreviewNode>,
313    pub edges: Vec<WorkflowEdge>,
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
317pub struct WorkflowBundlePreviewNode {
318    pub id: String,
319    pub kind: String,
320    pub label: Option<String>,
321    pub prompt_capsule: Option<String>,
322    pub trigger_ids: Vec<String>,
323    pub outgoing: Vec<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
327pub struct WorkflowBundleGraphExport {
328    pub schema_version: u32,
329    pub graph_id: String,
330    pub graph_digest: String,
331    pub nodes: Vec<WorkflowBundleGraphNode>,
332    pub edges: Vec<WorkflowBundleGraphEdge>,
333    pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
334    pub editable_fields: Vec<WorkflowBundleEditableField>,
335    pub mermaid: String,
336}
337
338#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
339pub struct WorkflowBundleGraphNode {
340    pub id: String,
341    pub node_type: String,
342    pub label: String,
343    pub workflow_node_id: Option<String>,
344    pub trigger_id: Option<String>,
345    pub connector_id: Option<String>,
346    pub editable_fields: Vec<WorkflowBundleEditableField>,
347    pub metadata: BTreeMap<String, serde_json::Value>,
348}
349
350#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
351pub struct WorkflowBundleGraphEdge {
352    pub from: String,
353    pub to: String,
354    pub label: Option<String>,
355    pub branch: Option<String>,
356}
357
358#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
359pub struct WorkflowBundleGraphDiagnostic {
360    pub severity: String,
361    pub path: String,
362    pub message: String,
363    pub node_id: Option<String>,
364    pub graph_node_id: Option<String>,
365}
366
367#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
368pub struct WorkflowBundleEditableField {
369    pub id: String,
370    pub label: String,
371    pub json_pointer: String,
372    pub value_type: String,
373    pub required: bool,
374    pub enum_values: Vec<String>,
375}
376
377#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
378#[serde(default)]
379pub struct WorkflowBundleRunRequest {
380    pub trigger_id: Option<String>,
381    pub event_id: Option<String>,
382}
383
384#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
385pub struct WorkflowBundleRunReceipt {
386    pub schema_version: u32,
387    pub receipt_type: String,
388    pub bundle_id: String,
389    pub bundle_version: String,
390    pub workflow_id: String,
391    pub workflow_version: usize,
392    pub graph_digest: String,
393    pub run_id: String,
394    pub trigger_id: Option<String>,
395    pub event_ids: Vec<String>,
396    pub status: String,
397    pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
398    pub policy: WorkflowBundlePolicy,
399    pub connectors: Vec<ConnectorRequirement>,
400    pub environment: EnvironmentRequirements,
401}
402
403#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
404pub struct WorkflowBundleRunNodeReceipt {
405    pub node_id: String,
406    pub kind: String,
407    pub prompt_capsule: Option<String>,
408    pub status: String,
409}
410
411#[derive(Clone, Debug, PartialEq, Eq)]
412pub struct HarnpackEntry {
413    pub path: PathBuf,
414    pub bytes: Vec<u8>,
415    pub mode: u32,
416}
417
418impl HarnpackEntry {
419    pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
420        Self {
421            path: path.into(),
422            bytes: bytes.into(),
423            mode: DEFAULT_HARNPACK_FILE_MODE,
424        }
425    }
426
427    pub fn with_mode(mut self, mode: u32) -> Self {
428        self.mode = mode;
429        self
430    }
431}
432
433#[derive(Clone, Debug, PartialEq)]
434pub struct HarnpackArchive {
435    pub manifest: WorkflowBundle,
436    pub contents: Vec<HarnpackEntry>,
437}
438
439#[derive(Clone, Debug, PartialEq, Eq)]
440pub enum WorkflowBundleErrorKind {
441    Io,
442    Json,
443    MissingSchemaVersion,
444    UnsupportedSchemaVersion { actual: u32, expected: u32 },
445    InvalidArchive,
446    DuplicateArchiveEntry,
447    UnsafeArchivePath,
448    InvalidSignature,
449}
450
451#[derive(Clone, Debug, PartialEq, Eq)]
452pub struct WorkflowBundleError {
453    pub kind: WorkflowBundleErrorKind,
454    pub message: String,
455}
456
457impl WorkflowBundleError {
458    fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
459        Self {
460            kind,
461            message: message.into(),
462        }
463    }
464
465    fn unsupported_schema_version(actual: u32) -> Self {
466        Self::new(
467            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
468                actual,
469                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
470            },
471            format!(
472                "unsupported workflow bundle schema_version {actual}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
473            ),
474        )
475    }
476}
477
478impl std::fmt::Display for WorkflowBundleError {
479    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
480        self.message.fmt(f)
481    }
482}
483
484impl std::error::Error for WorkflowBundleError {}
485
486impl From<std::io::Error> for WorkflowBundleError {
487    fn from(error: std::io::Error) -> Self {
488        Self::new(WorkflowBundleErrorKind::Io, error.to_string())
489    }
490}
491
492impl From<serde_json::Error> for WorkflowBundleError {
493    fn from(error: serde_json::Error) -> Self {
494        Self::new(WorkflowBundleErrorKind::Json, error.to_string())
495    }
496}
497
498pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
499    let bytes = fs::read(path)?;
500    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
501        read_harnpack(&bytes).map(|archive| archive.manifest)
502    } else {
503        parse_workflow_bundle_manifest(&bytes)
504    }
505}
506
507/// Read a manifest from any prior or current schema version without
508/// rejecting on a `schema_version` mismatch. Used by `harn pack
509/// --upgrade` to read v1 bundles before re-emitting them under the v2
510/// shape. Fields the older schema didn't carry deserialize to their
511/// type defaults via `#[serde(default)]`.
512pub fn read_workflow_bundle_manifest_any_version(
513    bytes: &[u8],
514) -> Result<WorkflowBundle, WorkflowBundleError> {
515    let value: serde_json::Value = serde_json::from_slice(bytes)?;
516    if value.get("schema_version").is_none() {
517        return Err(WorkflowBundleError::new(
518            WorkflowBundleErrorKind::MissingSchemaVersion,
519            "workflow bundle manifest is missing schema_version",
520        ));
521    }
522    serde_json::from_value(value).map_err(Into::into)
523}
524
525/// Variant of [`load_workflow_bundle`] that accepts any historical
526/// schema version. Reads the manifest from a `.harnpack` archive or a
527/// bare JSON manifest as appropriate.
528pub fn load_workflow_bundle_any_version(
529    path: &Path,
530) -> Result<WorkflowBundle, WorkflowBundleError> {
531    let bytes = fs::read(path)?;
532    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
533        let tar_bytes = decompress_harnpack_zstd(&bytes)?;
534        let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
535        for entry in archive.entries()? {
536            let mut entry = entry?;
537            if entry.header().entry_type().is_dir() {
538                continue;
539            }
540            let path = normalize_archive_path(entry.path()?.as_ref())?;
541            if path == HARNPACK_MANIFEST_PATH {
542                let mut entry_bytes = Vec::new();
543                entry.read_to_end(&mut entry_bytes)?;
544                return read_workflow_bundle_manifest_any_version(&entry_bytes);
545            }
546        }
547        Err(WorkflowBundleError::new(
548            WorkflowBundleErrorKind::InvalidArchive,
549            format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
550        ))
551    } else {
552        read_workflow_bundle_manifest_any_version(&bytes)
553    }
554}
555
556pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
557    let value: serde_json::Value = serde_json::from_slice(bytes)?;
558    let schema_version = value
559        .get("schema_version")
560        .and_then(serde_json::Value::as_u64)
561        .ok_or_else(|| {
562            WorkflowBundleError::new(
563                WorkflowBundleErrorKind::MissingSchemaVersion,
564                "workflow bundle manifest is missing numeric schema_version",
565            )
566        })?;
567    let actual = u32::try_from(schema_version).map_err(|_| {
568        WorkflowBundleError::new(
569            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
570                actual: u32::MAX,
571                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
572            },
573            format!(
574                "unsupported workflow bundle schema_version {schema_version}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
575            ),
576        )
577    })?;
578    if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
579        return Err(WorkflowBundleError::unsupported_schema_version(actual));
580    }
581    serde_json::from_value(value).map_err(Into::into)
582}
583
584pub fn canonical_workflow_bundle_manifest_bytes(
585    bundle: &WorkflowBundle,
586) -> Result<Vec<u8>, WorkflowBundleError> {
587    serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
588}
589
590pub fn workflow_bundle_hash(
591    bundle: &WorkflowBundle,
592    contents: &[HarnpackEntry],
593) -> Result<String, WorkflowBundleError> {
594    let mut hasher = blake3::Hasher::new();
595    let mut canonical = canonical_workflow_bundle_manifest(bundle);
596    canonical.signature = None;
597    let manifest_bytes = serde_json::to_vec(&canonical)?;
598    hasher.update(&manifest_bytes);
599
600    let mut content_hashes = contents
601        .iter()
602        .map(|entry| blake3_hash_bytes(&entry.bytes))
603        .collect::<Vec<_>>();
604    content_hashes.sort();
605    for content_hash in content_hashes {
606        hasher.update(b"\n");
607        hasher.update(content_hash.as_bytes());
608    }
609
610    Ok(blake3_digest_string(hasher.finalize()))
611}
612
613pub fn verify_workflow_bundle_signature(
614    bundle: &WorkflowBundle,
615    contents: &[HarnpackEntry],
616) -> Result<(), WorkflowBundleError> {
617    let signature = bundle.signature.as_ref().ok_or_else(|| {
618        WorkflowBundleError::new(
619            WorkflowBundleErrorKind::InvalidSignature,
620            "workflow bundle is unsigned",
621        )
622    })?;
623    if signature.algorithm != "ed25519" {
624        return Err(WorkflowBundleError::new(
625            WorkflowBundleErrorKind::InvalidSignature,
626            format!(
627                "unsupported workflow bundle signature algorithm {}",
628                signature.algorithm
629            ),
630        ));
631    }
632    let expected_hash = workflow_bundle_hash(bundle, contents)?;
633    if signature.manifest_hash_blake3 != expected_hash {
634        return Err(WorkflowBundleError::new(
635            WorkflowBundleErrorKind::InvalidSignature,
636            format!(
637                "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
638                signature.manifest_hash_blake3
639            ),
640        ));
641    }
642    let public_key_bytes = decode_hex_exact::<32>(
643        "workflow bundle signature public_key",
644        &signature.public_key,
645    )?;
646    let signature_bytes =
647        decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
648    let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
649        WorkflowBundleError::new(
650            WorkflowBundleErrorKind::InvalidSignature,
651            format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
652        )
653    })?;
654    let ed25519_signature = Signature::from_bytes(&signature_bytes);
655    verifying_key
656        .verify(expected_hash.as_bytes(), &ed25519_signature)
657        .map_err(|error| {
658            WorkflowBundleError::new(
659                WorkflowBundleErrorKind::InvalidSignature,
660                format!("workflow bundle signature failed Ed25519 verification: {error}"),
661            )
662        })
663}
664
665pub fn build_harnpack(
666    bundle: &WorkflowBundle,
667    contents: &[HarnpackEntry],
668) -> Result<Vec<u8>, WorkflowBundleError> {
669    let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
670    let mut entries = contents
671        .iter()
672        .map(|entry| {
673            normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
674        })
675        .collect::<Result<Vec<_>, _>>()?;
676    entries.sort_by(|left, right| left.0.cmp(&right.0));
677
678    let mut seen = BTreeSet::new();
679    for (path, _, _) in &entries {
680        if path == HARNPACK_MANIFEST_PATH {
681            return Err(WorkflowBundleError::new(
682                WorkflowBundleErrorKind::DuplicateArchiveEntry,
683                format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
684            ));
685        }
686        if !seen.insert(path.clone()) {
687            return Err(WorkflowBundleError::new(
688                WorkflowBundleErrorKind::DuplicateArchiveEntry,
689                format!("duplicate archive entry: {path}"),
690            ));
691        }
692    }
693
694    let mut tar_bytes = Vec::new();
695    {
696        let mut builder = tar::Builder::new(&mut tar_bytes);
697        append_harnpack_entry(
698            &mut builder,
699            HARNPACK_MANIFEST_PATH,
700            &manifest_bytes,
701            DEFAULT_HARNPACK_FILE_MODE,
702        )?;
703        for (path, bytes, mode) in entries {
704            append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
705        }
706        builder.finish()?;
707    }
708
709    zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
710}
711
712pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
713    let tar_bytes = decompress_harnpack_zstd(bytes)?;
714    let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
715    let mut manifest = None;
716    let mut contents = Vec::new();
717    let mut seen = BTreeSet::new();
718
719    for entry in archive.entries()? {
720        let mut entry = entry?;
721        if entry.header().entry_type().is_dir() {
722            continue;
723        }
724        let path = normalize_archive_path(entry.path()?.as_ref())?;
725        if !seen.insert(path.clone()) {
726            return Err(WorkflowBundleError::new(
727                WorkflowBundleErrorKind::DuplicateArchiveEntry,
728                format!("duplicate archive entry: {path}"),
729            ));
730        }
731        let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
732        let mut entry_bytes = Vec::new();
733        entry.read_to_end(&mut entry_bytes)?;
734
735        if path == HARNPACK_MANIFEST_PATH {
736            manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
737        } else {
738            contents.push(HarnpackEntry {
739                path: PathBuf::from(path),
740                bytes: entry_bytes,
741                mode,
742            });
743        }
744    }
745
746    contents.sort_by(|left, right| left.path.cmp(&right.path));
747    Ok(HarnpackArchive {
748        manifest: manifest.ok_or_else(|| {
749            WorkflowBundleError::new(
750                WorkflowBundleErrorKind::InvalidArchive,
751                format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
752            )
753        })?,
754        contents,
755    })
756}
757
758pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
759    let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
760    Ok(blake3_hash_bytes(&bytes))
761}
762
763pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
764    let mut canonical = canonical_workflow_graph(graph);
765    canonical.audit_log.clear();
766    let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
767    let digest = Sha256::digest(bytes);
768    let hex = digest
769        .iter()
770        .map(|byte| format!("{byte:02x}"))
771        .collect::<String>();
772    format!("sha256:{hex}")
773}
774
775fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
776    let mut canonical = bundle.clone();
777    canonical.workflow = canonical_workflow_graph(&bundle.workflow);
778    canonical.transitive_modules.sort_by(|left, right| {
779        (
780            path_sort_key(&left.path),
781            &left.source_hash_blake3,
782            &left.harnbc_hash_blake3,
783        )
784            .cmp(&(
785                path_sort_key(&right.path),
786                &right.source_hash_blake3,
787                &right.harnbc_hash_blake3,
788            ))
789    });
790    canonical.tool_manifest.sort_by(|left, right| {
791        (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
792            &right.name,
793            &right.provider,
794            &right.schema_hash_blake3,
795        ))
796    });
797    canonical.sbom.packages.sort_by(|left, right| {
798        (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
799            &right.name,
800            &right.version,
801            &right.package_hash_blake3,
802        ))
803    });
804    canonical.sbom.relationships.sort_by(|left, right| {
805        (&left.from, &left.to, &left.relationship_type).cmp(&(
806            &right.from,
807            &right.to,
808            &right.relationship_type,
809        ))
810    });
811    canonical
812}
813
814fn decode_hex_exact<const N: usize>(
815    label: &str,
816    value: &str,
817) -> Result<[u8; N], WorkflowBundleError> {
818    let bytes = hex::decode(value).map_err(|error| {
819        WorkflowBundleError::new(
820            WorkflowBundleErrorKind::InvalidSignature,
821            format!("{label} is not hex: {error}"),
822        )
823    })?;
824    bytes.try_into().map_err(|bytes: Vec<u8>| {
825        WorkflowBundleError::new(
826            WorkflowBundleErrorKind::InvalidSignature,
827            format!("{label} must decode to {N} bytes, got {}", bytes.len()),
828        )
829    })
830}
831
832fn append_harnpack_entry<W: std::io::Write>(
833    builder: &mut tar::Builder<W>,
834    path: &str,
835    bytes: &[u8],
836    mode: u32,
837) -> Result<(), WorkflowBundleError> {
838    let mut header = tar::Header::new_gnu();
839    header.set_path(path).map_err(|error| {
840        WorkflowBundleError::new(
841            WorkflowBundleErrorKind::UnsafeArchivePath,
842            format!("invalid archive path {path}: {error}"),
843        )
844    })?;
845    header.set_entry_type(tar::EntryType::Regular);
846    header.set_size(bytes.len() as u64);
847    header.set_mode(mode);
848    header.set_mtime(0);
849    header.set_uid(0);
850    header.set_gid(0);
851    header.set_cksum();
852    builder.append(&header, bytes)?;
853    Ok(())
854}
855
856fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
857    let mut parts = Vec::new();
858    for component in path.components() {
859        match component {
860            Component::Normal(part) => {
861                let Some(part) = part.to_str() else {
862                    return Err(WorkflowBundleError::new(
863                        WorkflowBundleErrorKind::UnsafeArchivePath,
864                        format!("archive path is not valid UTF-8: {}", path.display()),
865                    ));
866                };
867                if part.is_empty() {
868                    return Err(WorkflowBundleError::new(
869                        WorkflowBundleErrorKind::UnsafeArchivePath,
870                        "archive path contains an empty component",
871                    ));
872                }
873                parts.push(part.to_string());
874            }
875            Component::CurDir => {}
876            Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
877                return Err(WorkflowBundleError::new(
878                    WorkflowBundleErrorKind::UnsafeArchivePath,
879                    format!(
880                        "archive path must be relative and contained: {}",
881                        path.display()
882                    ),
883                ));
884            }
885        }
886    }
887    if parts.is_empty() {
888        return Err(WorkflowBundleError::new(
889            WorkflowBundleErrorKind::UnsafeArchivePath,
890            "archive path is empty",
891        ));
892    }
893    Ok(parts.join("/"))
894}
895
896fn path_sort_key(path: &Path) -> String {
897    path.components()
898        .filter_map(|component| match component {
899            Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
900            _ => None,
901        })
902        .collect::<Vec<_>>()
903        .join("/")
904}
905
906fn blake3_hash_bytes(bytes: &[u8]) -> String {
907    blake3_digest_string(blake3::hash(bytes))
908}
909
910fn blake3_digest_string(hash: blake3::Hash) -> String {
911    format!("blake3:{hash}")
912}
913
914pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
915    let canonical = canonical_workflow_graph(&bundle.workflow);
916    let mut report = WorkflowBundleValidationReport {
917        valid: true,
918        bundle_id: bundle.id.clone(),
919        workflow_id: canonical.id.clone(),
920        graph_digest: workflow_graph_digest(&canonical),
921        errors: Vec::new(),
922        warnings: Vec::new(),
923    };
924
925    validate_manifest_contract(bundle, &mut report);
926    validate_bundle_identity(bundle, &canonical, &mut report);
927    validate_triggers(bundle, &canonical, &mut report);
928    validate_prompt_capsules(bundle, &canonical, &mut report);
929    validate_policy(bundle, &mut report);
930    validate_connectors(bundle, &mut report);
931    validate_environment(bundle, &mut report);
932
933    let graph_report = validate_workflow(&canonical, None);
934    for error in graph_report.errors {
935        let node_id = workflow_diagnostic_node_id(&error, &canonical);
936        push_error(&mut report, "workflow", error, node_id);
937    }
938    for warning in graph_report.warnings {
939        let node_id = workflow_diagnostic_node_id(&warning, &canonical);
940        push_warning(&mut report, "workflow", warning, node_id);
941    }
942
943    if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
944        if expected != report.graph_digest {
945            let actual = report.graph_digest.clone();
946            push_error(
947                &mut report,
948                "receipts.graph_digest",
949                format!("graph digest mismatch: expected {expected}, computed {actual}"),
950                None,
951            );
952        }
953    }
954    if let Some(expected_version) = bundle.receipts.workflow_version {
955        if expected_version != canonical.version {
956            push_error(
957                &mut report,
958                "receipts.workflow_version",
959                format!(
960                    "workflow version mismatch: expected {expected_version}, computed {}",
961                    canonical.version
962                ),
963                None,
964            );
965        }
966    }
967
968    report.valid = report.errors.is_empty();
969    report
970}
971
972pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
973    let canonical = canonical_workflow_graph(&bundle.workflow);
974    let validation = validate_workflow_bundle(bundle);
975    let graph = export_workflow_bundle_graph(bundle, &validation);
976    let mermaid = graph.mermaid.clone();
977    let triggers_by_node = triggers_by_node(bundle);
978    let capsules_by_node = capsules_by_node(bundle);
979    let mut nodes = Vec::new();
980
981    for (node_id, node) in &canonical.nodes {
982        let mut outgoing = canonical
983            .edges
984            .iter()
985            .filter(|edge| edge.from == *node_id)
986            .map(|edge| edge.to.clone())
987            .collect::<Vec<_>>();
988        outgoing.sort();
989        outgoing.dedup();
990        nodes.push(WorkflowBundlePreviewNode {
991            id: node_id.clone(),
992            kind: node.kind.clone(),
993            label: node.task_label.clone(),
994            prompt_capsule: capsules_by_node.get(node_id).cloned(),
995            trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
996            outgoing,
997        });
998    }
999
1000    WorkflowBundlePreview {
1001        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1002        bundle_id: bundle.id.clone(),
1003        bundle_version: bundle.version.clone(),
1004        workflow_id: canonical.id.clone(),
1005        workflow_version: canonical.version,
1006        graph_digest: validation.graph_digest.clone(),
1007        validation,
1008        graph,
1009        mermaid,
1010        triggers: bundle.triggers.clone(),
1011        connectors: bundle.connectors.clone(),
1012        environment: bundle.environment.clone(),
1013        nodes,
1014        edges: sorted_edges(&canonical),
1015    }
1016}
1017
1018pub fn export_workflow_bundle_graph(
1019    bundle: &WorkflowBundle,
1020    validation: &WorkflowBundleValidationReport,
1021) -> WorkflowBundleGraphExport {
1022    let canonical = canonical_workflow_graph(&bundle.workflow);
1023    let mut nodes = Vec::new();
1024    let mut edges = Vec::new();
1025    let mut editable_fields = Vec::new();
1026    let capsules_by_node = capsules_by_node(bundle);
1027    let catchup_enabled = bundle.policy.catchup.mode != "none";
1028    let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1029
1030    for (index, connector) in bundle.connectors.iter().enumerate() {
1031        let node_fields = connector_editable_fields(index, connector);
1032        editable_fields.extend(node_fields.clone());
1033        nodes.push(WorkflowBundleGraphNode {
1034            id: connector_graph_id(&connector.id),
1035            node_type: "connector_call".to_string(),
1036            label: connector_label(connector),
1037            workflow_node_id: None,
1038            trigger_id: None,
1039            connector_id: Some(connector.id.clone()),
1040            editable_fields: node_fields,
1041            metadata: BTreeMap::from([
1042                (
1043                    "provider_id".to_string(),
1044                    serde_json::json!(connector.provider_id),
1045                ),
1046                ("scopes".to_string(), serde_json::json!(connector.scopes)),
1047            ]),
1048        });
1049    }
1050
1051    let catchup_fields = catchup_editable_fields();
1052    let retry_fields = retry_editable_fields();
1053    if catchup_enabled {
1054        let node_fields = catchup_fields;
1055        editable_fields.extend(node_fields.clone());
1056        nodes.push(WorkflowBundleGraphNode {
1057            id: catchup_graph_id(),
1058            node_type: "catchup".to_string(),
1059            label: "Catch up".to_string(),
1060            workflow_node_id: None,
1061            trigger_id: None,
1062            connector_id: None,
1063            editable_fields: node_fields,
1064            metadata: BTreeMap::from([(
1065                "mode".to_string(),
1066                serde_json::json!(bundle.policy.catchup.mode),
1067            )]),
1068        });
1069    } else {
1070        editable_fields.extend(catchup_fields);
1071    }
1072    if retry_can_dlq {
1073        let node_fields = retry_fields;
1074        editable_fields.extend(node_fields.clone());
1075        nodes.push(WorkflowBundleGraphNode {
1076            id: dlq_graph_id(),
1077            node_type: "dlq".to_string(),
1078            label: "Dead letter queue".to_string(),
1079            workflow_node_id: None,
1080            trigger_id: None,
1081            connector_id: None,
1082            editable_fields: node_fields,
1083            metadata: BTreeMap::from([(
1084                "max_attempts".to_string(),
1085                serde_json::json!(bundle.policy.retry.max_attempts),
1086            )]),
1087        });
1088    } else {
1089        editable_fields.extend(retry_fields);
1090    }
1091
1092    for (index, trigger) in bundle.triggers.iter().enumerate() {
1093        let node_fields = trigger_editable_fields(index, trigger);
1094        editable_fields.extend(node_fields.clone());
1095        nodes.push(WorkflowBundleGraphNode {
1096            id: trigger_graph_id(&trigger.id),
1097            node_type: "trigger".to_string(),
1098            label: trigger_label(trigger),
1099            workflow_node_id: trigger.node_id.clone(),
1100            trigger_id: Some(trigger.id.clone()),
1101            connector_id: None,
1102            editable_fields: node_fields,
1103            metadata: BTreeMap::from([
1104                ("kind".to_string(), serde_json::json!(trigger.kind)),
1105                ("provider".to_string(), serde_json::json!(trigger.provider)),
1106                ("events".to_string(), serde_json::json!(trigger.events)),
1107            ]),
1108        });
1109        if let Some(provider) = trigger.provider.as_deref() {
1110            if let Some(connector) = bundle
1111                .connectors
1112                .iter()
1113                .find(|connector| connector.provider_id == provider || connector.id == provider)
1114            {
1115                edges.push(WorkflowBundleGraphEdge {
1116                    from: connector_graph_id(&connector.id),
1117                    to: trigger_graph_id(&trigger.id),
1118                    label: Some("binds".to_string()),
1119                    branch: None,
1120                });
1121            }
1122        }
1123        let target = trigger
1124            .node_id
1125            .clone()
1126            .unwrap_or_else(|| canonical.entry.clone());
1127        if catchup_enabled {
1128            edges.push(WorkflowBundleGraphEdge {
1129                from: trigger_graph_id(&trigger.id),
1130                to: catchup_graph_id(),
1131                label: Some(bundle.policy.catchup.mode.clone()),
1132                branch: Some("catchup".to_string()),
1133            });
1134            edges.push(WorkflowBundleGraphEdge {
1135                from: catchup_graph_id(),
1136                to: workflow_graph_id(&target),
1137                label: Some("dispatch".to_string()),
1138                branch: None,
1139            });
1140        } else {
1141            edges.push(WorkflowBundleGraphEdge {
1142                from: trigger_graph_id(&trigger.id),
1143                to: workflow_graph_id(&target),
1144                label: Some("dispatch".to_string()),
1145                branch: None,
1146            });
1147        }
1148    }
1149
1150    for (node_id, node) in &canonical.nodes {
1151        let capsule_id = capsules_by_node.get(node_id);
1152        let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1153        editable_fields.extend(node_fields.clone());
1154        nodes.push(WorkflowBundleGraphNode {
1155            id: workflow_graph_id(node_id),
1156            node_type: workflow_node_type(&node.kind),
1157            label: workflow_node_label(node_id, node),
1158            workflow_node_id: Some(node_id.clone()),
1159            trigger_id: None,
1160            connector_id: None,
1161            editable_fields: node_fields,
1162            metadata: BTreeMap::from([
1163                ("kind".to_string(), serde_json::json!(node.kind)),
1164                ("task_label".to_string(), serde_json::json!(node.task_label)),
1165                (
1166                    "prompt_capsule".to_string(),
1167                    serde_json::json!(capsule_id.cloned()),
1168                ),
1169            ]),
1170        });
1171    }
1172
1173    for edge in sorted_edges(&canonical) {
1174        edges.push(WorkflowBundleGraphEdge {
1175            from: workflow_graph_id(&edge.from),
1176            to: workflow_graph_id(&edge.to),
1177            label: edge.label.clone(),
1178            branch: edge.branch.clone(),
1179        });
1180    }
1181
1182    let outgoing: BTreeSet<&str> = canonical
1183        .edges
1184        .iter()
1185        .map(|edge| edge.from.as_str())
1186        .collect();
1187    for node_id in canonical.nodes.keys() {
1188        if !outgoing.contains(node_id.as_str()) {
1189            edges.push(WorkflowBundleGraphEdge {
1190                from: workflow_graph_id(node_id),
1191                to: terminal_completed_graph_id(),
1192                label: Some("completed".to_string()),
1193                branch: Some("completed".to_string()),
1194            });
1195        }
1196        if retry_can_dlq {
1197            edges.push(WorkflowBundleGraphEdge {
1198                from: workflow_graph_id(node_id),
1199                to: dlq_graph_id(),
1200                label: Some("retry exhausted".to_string()),
1201                branch: Some("failed".to_string()),
1202            });
1203        }
1204    }
1205
1206    nodes.push(WorkflowBundleGraphNode {
1207        id: terminal_completed_graph_id(),
1208        node_type: "terminal".to_string(),
1209        label: "Completed".to_string(),
1210        workflow_node_id: None,
1211        trigger_id: None,
1212        connector_id: None,
1213        editable_fields: Vec::new(),
1214        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1215    });
1216    nodes.push(WorkflowBundleGraphNode {
1217        id: terminal_failed_graph_id(),
1218        node_type: "terminal".to_string(),
1219        label: "Failed".to_string(),
1220        workflow_node_id: None,
1221        trigger_id: None,
1222        connector_id: None,
1223        editable_fields: Vec::new(),
1224        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1225    });
1226    if retry_can_dlq {
1227        edges.push(WorkflowBundleGraphEdge {
1228            from: dlq_graph_id(),
1229            to: terminal_failed_graph_id(),
1230            label: Some("failed".to_string()),
1231            branch: Some("failed".to_string()),
1232        });
1233    }
1234
1235    nodes.sort_by(|left, right| left.id.cmp(&right.id));
1236    edges.sort_by(|left, right| {
1237        (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1238            &right.from,
1239            &right.to,
1240            &right.branch,
1241            &right.label,
1242        ))
1243    });
1244    editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1245
1246    let diagnostics = validation
1247        .errors
1248        .iter()
1249        .chain(validation.warnings.iter())
1250        .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1251            severity: diagnostic.severity.clone(),
1252            path: diagnostic.path.clone(),
1253            message: diagnostic.message.clone(),
1254            node_id: diagnostic.node_id.clone(),
1255            graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1256        })
1257        .collect::<Vec<_>>();
1258    let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1259
1260    WorkflowBundleGraphExport {
1261        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1262        graph_id: canonical.id,
1263        graph_digest: validation.graph_digest.clone(),
1264        nodes,
1265        edges,
1266        diagnostics,
1267        editable_fields,
1268        mermaid,
1269    }
1270}
1271
1272pub fn run_workflow_bundle(
1273    bundle: &WorkflowBundle,
1274    request: WorkflowBundleRunRequest,
1275) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1276    let validation = validate_workflow_bundle(bundle);
1277    if !validation.valid {
1278        return Err(validation);
1279    }
1280
1281    let canonical = canonical_workflow_graph(&bundle.workflow);
1282    let trigger_id = match request.trigger_id {
1283        Some(trigger_id)
1284            if !bundle
1285                .triggers
1286                .iter()
1287                .any(|trigger| trigger.id == trigger_id) =>
1288        {
1289            let mut report = validation;
1290            push_error(
1291                &mut report,
1292                "trigger_id",
1293                format!("unknown trigger id: {trigger_id}"),
1294                None,
1295            );
1296            report.valid = false;
1297            return Err(report);
1298        }
1299        Some(trigger_id) => Some(trigger_id),
1300        None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1301    };
1302    let mut event_ids = bundle.receipts.event_ids.clone();
1303    if let Some(event_id) = request.event_id {
1304        if !event_ids.contains(&event_id) {
1305            event_ids.push(event_id);
1306        }
1307    }
1308    let run_id = bundle
1309        .receipts
1310        .run_id
1311        .clone()
1312        .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1313    let capsules_by_node = capsules_by_node(bundle);
1314    let executed_nodes = execution_order(&canonical)
1315        .into_iter()
1316        .map(|node_id| {
1317            let node = canonical
1318                .nodes
1319                .get(&node_id)
1320                .expect("execution order only contains known nodes");
1321            WorkflowBundleRunNodeReceipt {
1322                node_id: node_id.clone(),
1323                kind: node.kind.clone(),
1324                prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1325                status: "completed".to_string(),
1326            }
1327        })
1328        .collect();
1329
1330    Ok(WorkflowBundleRunReceipt {
1331        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1332        receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1333        bundle_id: bundle.id.clone(),
1334        bundle_version: bundle.version.clone(),
1335        workflow_id: canonical.id,
1336        workflow_version: canonical.version,
1337        graph_digest: validation.graph_digest,
1338        run_id,
1339        trigger_id,
1340        event_ids,
1341        status: "completed".to_string(),
1342        executed_nodes,
1343        policy: bundle.policy.clone(),
1344        connectors: bundle.connectors.clone(),
1345        environment: bundle.environment.clone(),
1346    })
1347}
1348
1349fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1350    let mut canonical = graph.clone();
1351    if canonical.type_name.is_empty() {
1352        canonical.type_name = "workflow_graph".to_string();
1353    }
1354    if canonical.version == 0 {
1355        canonical.version = 1;
1356    }
1357    if canonical.entry.is_empty() {
1358        canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1359    }
1360    for (node_id, node) in &mut canonical.nodes {
1361        if node.id.is_none() {
1362            node.id = Some(node_id.clone());
1363        }
1364        if node.kind.is_empty() {
1365            node.kind = "stage".to_string();
1366        }
1367        if node.retry_policy.max_attempts == 0 {
1368            node.retry_policy.max_attempts = 1;
1369        }
1370    }
1371    canonical.edges = sorted_edges(&canonical);
1372    canonical
1373}
1374
1375fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1376    let mut edges = graph.edges.clone();
1377    edges.sort_by(|left, right| {
1378        (
1379            &left.from,
1380            &left.to,
1381            left.branch.as_deref(),
1382            left.label.as_deref(),
1383        )
1384            .cmp(&(
1385                &right.from,
1386                &right.to,
1387                right.branch.as_deref(),
1388                right.label.as_deref(),
1389            ))
1390    });
1391    edges
1392}
1393
1394fn validate_bundle_identity(
1395    bundle: &WorkflowBundle,
1396    graph: &WorkflowGraph,
1397    report: &mut WorkflowBundleValidationReport,
1398) {
1399    if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1400        push_error(
1401            report,
1402            "schema_version",
1403            format!(
1404                "unsupported schema_version {}; expected {}",
1405                bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1406            ),
1407            None,
1408        );
1409    }
1410    if bundle.id.trim().is_empty() {
1411        push_error(report, "id", "bundle id is required", None);
1412    }
1413    if bundle.version.trim().is_empty() {
1414        push_error(report, "version", "bundle version is required", None);
1415    }
1416    if graph.id.trim().is_empty() {
1417        push_error(
1418            report,
1419            "workflow.id",
1420            "workflow id is required for portable bundles",
1421            None,
1422        );
1423    }
1424    if graph.nodes.is_empty() {
1425        push_error(
1426            report,
1427            "workflow.nodes",
1428            "workflow must contain nodes",
1429            None,
1430        );
1431    }
1432    for (node_id, node) in &graph.nodes {
1433        if node_id.trim().is_empty() {
1434            push_error(report, "workflow.nodes", "node id is required", None);
1435        }
1436        if node.id.as_deref().is_some_and(|id| id != node_id) {
1437            push_error(
1438                report,
1439                format!("workflow.nodes.{node_id}.id"),
1440                "node id field must match its map key",
1441                Some(node_id.clone()),
1442            );
1443        }
1444    }
1445}
1446
1447fn validate_manifest_contract(
1448    bundle: &WorkflowBundle,
1449    report: &mut WorkflowBundleValidationReport,
1450) {
1451    validate_relative_path(
1452        report,
1453        "entrypoint",
1454        &bundle.entrypoint,
1455        "entrypoint is required",
1456    );
1457    if bundle.transitive_modules.is_empty() {
1458        push_error(
1459            report,
1460            "transitive_modules",
1461            "at least one transitive module entry is required",
1462            None,
1463        );
1464    }
1465    let mut module_paths = BTreeSet::new();
1466    for (index, module) in bundle.transitive_modules.iter().enumerate() {
1467        let path = format!("transitive_modules[{index}]");
1468        validate_relative_path(
1469            report,
1470            format!("{path}.path"),
1471            &module.path,
1472            "module path is required",
1473        );
1474        if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1475        {
1476            push_error(
1477                report,
1478                format!("{path}.path"),
1479                format!(
1480                    "duplicate transitive module path: {}",
1481                    module.path.display()
1482                ),
1483                None,
1484            );
1485        }
1486        validate_blake3_hash(
1487            report,
1488            format!("{path}.source_hash_blake3"),
1489            &module.source_hash_blake3,
1490            true,
1491        );
1492        validate_blake3_hash(
1493            report,
1494            format!("{path}.harnbc_hash_blake3"),
1495            &module.harnbc_hash_blake3,
1496            true,
1497        );
1498    }
1499    if bundle.stdlib_version.trim().is_empty() {
1500        push_error(report, "stdlib_version", "stdlib_version is required", None);
1501    }
1502    if bundle.harn_version.trim().is_empty() {
1503        push_error(report, "harn_version", "harn_version is required", None);
1504    }
1505    validate_blake3_hash(
1506        report,
1507        "provider_catalog_hash",
1508        &bundle.provider_catalog_hash,
1509        true,
1510    );
1511
1512    let mut tool_names = BTreeSet::new();
1513    for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1514        let path = format!("tool_manifest[{index}]");
1515        if tool.name.trim().is_empty() {
1516            push_error(
1517                report,
1518                format!("{path}.name"),
1519                "tool manifest entry name is required",
1520                None,
1521            );
1522        } else if !tool_names.insert(tool.name.clone()) {
1523            push_error(
1524                report,
1525                format!("{path}.name"),
1526                format!("duplicate tool manifest entry: {}", tool.name),
1527                None,
1528            );
1529        }
1530        if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1531            validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1532        }
1533    }
1534
1535    if bundle.sbom.format.trim().is_empty() {
1536        push_error(report, "sbom.format", "SBOM format is required", None);
1537    }
1538    if bundle.sbom.version.trim().is_empty() {
1539        push_error(report, "sbom.version", "SBOM version is required", None);
1540    }
1541    let mut sbom_packages = BTreeSet::new();
1542    for (index, package) in bundle.sbom.packages.iter().enumerate() {
1543        let path = format!("sbom.packages[{index}]");
1544        if package.name.trim().is_empty() {
1545            push_error(
1546                report,
1547                format!("{path}.name"),
1548                "SBOM package name is required",
1549                None,
1550            );
1551        } else if !sbom_packages.insert(package.name.clone()) {
1552            push_error(
1553                report,
1554                format!("{path}.name"),
1555                format!("duplicate SBOM package: {}", package.name),
1556                None,
1557            );
1558        }
1559        if let Some(hash) = package.package_hash_blake3.as_deref() {
1560            validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1561        }
1562    }
1563    for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1564        let path = format!("sbom.relationships[{index}]");
1565        if relationship.from.trim().is_empty() {
1566            push_error(
1567                report,
1568                format!("{path}.from"),
1569                "SBOM relationship source is required",
1570                None,
1571            );
1572        }
1573        if relationship.to.trim().is_empty() {
1574            push_error(
1575                report,
1576                format!("{path}.to"),
1577                "SBOM relationship target is required",
1578                None,
1579            );
1580        }
1581        if relationship.relationship_type.trim().is_empty() {
1582            push_error(
1583                report,
1584                format!("{path}.relationship_type"),
1585                "SBOM relationship type is required",
1586                None,
1587            );
1588        }
1589    }
1590
1591    if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1592        if parent_id.trim().is_empty() {
1593            push_error(
1594                report,
1595                "parent_trust_record_id",
1596                "parent_trust_record_id cannot be empty when present",
1597                None,
1598            );
1599        }
1600    }
1601    if let Some(signature) = &bundle.signature {
1602        if signature.algorithm.trim().is_empty() {
1603            push_error(
1604                report,
1605                "signature.algorithm",
1606                "signature algorithm is required",
1607                None,
1608            );
1609        } else if signature.algorithm != "ed25519" {
1610            push_error(
1611                report,
1612                "signature.algorithm",
1613                "signature algorithm must be ed25519",
1614                None,
1615            );
1616        }
1617        if signature.public_key.trim().is_empty() {
1618            push_error(
1619                report,
1620                "signature.public_key",
1621                "signature public_key is required",
1622                None,
1623            );
1624        }
1625        if signature.signature.trim().is_empty() {
1626            push_error(
1627                report,
1628                "signature.signature",
1629                "signature value is required",
1630                None,
1631            );
1632        }
1633        validate_blake3_hash(
1634            report,
1635            "signature.manifest_hash_blake3",
1636            &signature.manifest_hash_blake3,
1637            true,
1638        );
1639    }
1640}
1641
1642fn validate_relative_path(
1643    report: &mut WorkflowBundleValidationReport,
1644    path: impl Into<String>,
1645    value: &Path,
1646    empty_message: &str,
1647) {
1648    let path = path.into();
1649    if value.as_os_str().is_empty() {
1650        push_error(report, path, empty_message, None);
1651        return;
1652    }
1653    if normalize_archive_path(value).is_err() {
1654        push_error(
1655            report,
1656            path,
1657            format!("path must be relative and contained: {}", value.display()),
1658            None,
1659        );
1660    }
1661}
1662
1663fn validate_blake3_hash(
1664    report: &mut WorkflowBundleValidationReport,
1665    path: impl Into<String>,
1666    value: &str,
1667    required: bool,
1668) {
1669    let path = path.into();
1670    if value.trim().is_empty() {
1671        if required {
1672            push_error(report, path, "BLAKE3 hash is required", None);
1673        }
1674        return;
1675    }
1676    let Some(hex) = value.strip_prefix("blake3:") else {
1677        push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1678        return;
1679    };
1680    if hex.len() != 64
1681        || !hex
1682            .bytes()
1683            .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1684    {
1685        push_error(
1686            report,
1687            path,
1688            "BLAKE3 hash must contain 64 lowercase hex digits",
1689            None,
1690        );
1691    }
1692}
1693
1694fn validate_triggers(
1695    bundle: &WorkflowBundle,
1696    graph: &WorkflowGraph,
1697    report: &mut WorkflowBundleValidationReport,
1698) {
1699    if bundle.triggers.is_empty() {
1700        push_warning(report, "triggers", "bundle declares no triggers", None);
1701    }
1702    let mut ids = BTreeSet::new();
1703    for (index, trigger) in bundle.triggers.iter().enumerate() {
1704        let path = format!("triggers[{index}]");
1705        if trigger.id.trim().is_empty() {
1706            push_error(report, format!("{path}.id"), "trigger id is required", None);
1707        } else if !ids.insert(trigger.id.clone()) {
1708            push_error(
1709                report,
1710                format!("{path}.id"),
1711                format!("duplicate trigger id: {}", trigger.id),
1712                None,
1713            );
1714        }
1715        match trigger.kind.as_str() {
1716            "github" => {
1717                if trigger.provider.as_deref() != Some("github") {
1718                    push_error(
1719                        report,
1720                        format!("{path}.provider"),
1721                        "github triggers require provider=\"github\"",
1722                        None,
1723                    );
1724                }
1725                if trigger.events.is_empty() {
1726                    push_error(
1727                        report,
1728                        format!("{path}.events"),
1729                        "github triggers require at least one event",
1730                        None,
1731                    );
1732                }
1733            }
1734            "cron" if trigger.schedule.is_none() => push_error(
1735                report,
1736                format!("{path}.schedule"),
1737                "cron triggers require schedule",
1738                None,
1739            ),
1740            "delay" if trigger.delay.is_none() => push_error(
1741                report,
1742                format!("{path}.delay"),
1743                "delay triggers require delay",
1744                None,
1745            ),
1746            "webhook" if trigger.webhook_path.is_none() => push_error(
1747                report,
1748                format!("{path}.webhook_path"),
1749                "webhook triggers require webhook_path",
1750                None,
1751            ),
1752            "mcp" if trigger.mcp_tool.is_none() => push_error(
1753                report,
1754                format!("{path}.mcp_tool"),
1755                "mcp triggers require mcp_tool",
1756                None,
1757            ),
1758            "manual" => {}
1759            "" => push_error(
1760                report,
1761                format!("{path}.kind"),
1762                "trigger kind is required",
1763                None,
1764            ),
1765            other
1766                if !matches!(
1767                    other,
1768                    "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1769                ) =>
1770            {
1771                push_error(
1772                    report,
1773                    format!("{path}.kind"),
1774                    format!("unsupported trigger kind: {other}"),
1775                    None,
1776                );
1777            }
1778            _ => {}
1779        }
1780        if let Some(node_id) = trigger.node_id.as_deref() {
1781            if !graph.nodes.contains_key(node_id) {
1782                push_error(
1783                    report,
1784                    format!("{path}.node_id"),
1785                    format!("trigger references unknown node: {node_id}"),
1786                    Some(node_id.to_string()),
1787                );
1788            }
1789        }
1790    }
1791}
1792
1793fn validate_prompt_capsules(
1794    bundle: &WorkflowBundle,
1795    graph: &WorkflowGraph,
1796    report: &mut WorkflowBundleValidationReport,
1797) {
1798    let trigger_ids: BTreeSet<&str> = bundle
1799        .triggers
1800        .iter()
1801        .map(|trigger| trigger.id.as_str())
1802        .collect();
1803    let mut node_refs = BTreeSet::new();
1804    for (key, capsule) in &bundle.prompt_capsules {
1805        let path = format!("prompt_capsules.{key}");
1806        if capsule.id.trim().is_empty() {
1807            push_error(
1808                report,
1809                format!("{path}.id"),
1810                "prompt capsule id is required",
1811                None,
1812            );
1813        } else if capsule.id != *key {
1814            push_error(
1815                report,
1816                format!("{path}.id"),
1817                "prompt capsule id must match its map key",
1818                None,
1819            );
1820        }
1821        if capsule.prompt.trim().is_empty() {
1822            push_error(
1823                report,
1824                format!("{path}.prompt"),
1825                "prompt capsule prompt is required",
1826                Some(capsule.node_id.clone()),
1827            );
1828        }
1829        if !graph.nodes.contains_key(&capsule.node_id) {
1830            push_error(
1831                report,
1832                format!("{path}.node_id"),
1833                format!(
1834                    "prompt capsule references unknown node: {}",
1835                    capsule.node_id
1836                ),
1837                Some(capsule.node_id.clone()),
1838            );
1839        }
1840        if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1841            push_error(
1842                report,
1843                format!("{path}.node_id"),
1844                format!("multiple prompt capsules target node {}", capsule.node_id),
1845                Some(capsule.node_id.clone()),
1846            );
1847        }
1848        if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1849            if !trigger_ids.contains(trigger_id) {
1850                push_error(
1851                    report,
1852                    format!("{path}.trigger_id"),
1853                    format!("prompt capsule references unknown trigger: {trigger_id}"),
1854                    Some(capsule.node_id.clone()),
1855                );
1856            }
1857        }
1858    }
1859}
1860
1861fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1862    if !matches!(
1863        bundle.policy.autonomy_tier.as_str(),
1864        "shadow" | "suggest" | "act_with_approval" | "act_auto"
1865    ) {
1866        push_error(
1867            report,
1868            "policy.autonomy_tier",
1869            "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1870            None,
1871        );
1872    }
1873    if bundle.policy.retry.max_attempts == 0 {
1874        push_error(
1875            report,
1876            "policy.retry.max_attempts",
1877            "retry.max_attempts must be at least 1",
1878            None,
1879        );
1880    }
1881    if !matches!(
1882        bundle.policy.catchup.mode.as_str(),
1883        "none" | "latest" | "all"
1884    ) {
1885        push_error(
1886            report,
1887            "policy.catchup.mode",
1888            "catchup.mode must be none, latest, or all",
1889            None,
1890        );
1891    }
1892}
1893
1894fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1895    let mut ids = BTreeSet::new();
1896    let provider_ids: BTreeSet<&str> = bundle
1897        .connectors
1898        .iter()
1899        .map(|connector| connector.provider_id.as_str())
1900        .collect();
1901    for (index, connector) in bundle.connectors.iter().enumerate() {
1902        let path = format!("connectors[{index}]");
1903        if connector.id.trim().is_empty() {
1904            push_error(
1905                report,
1906                format!("{path}.id"),
1907                "connector id is required",
1908                None,
1909            );
1910        } else if !ids.insert(connector.id.clone()) {
1911            push_error(
1912                report,
1913                format!("{path}.id"),
1914                format!("duplicate connector id: {}", connector.id),
1915                None,
1916            );
1917        }
1918        if connector.provider_id.trim().is_empty() {
1919            push_error(
1920                report,
1921                format!("{path}.provider_id"),
1922                "connector provider_id is required",
1923                None,
1924            );
1925        }
1926    }
1927    for trigger in &bundle.triggers {
1928        if let Some(provider) = trigger.provider.as_deref() {
1929            if !provider_ids.contains(provider) {
1930                push_warning(
1931                    report,
1932                    "connectors",
1933                    format!(
1934                        "trigger {} references provider {provider} with no connector requirement",
1935                        trigger.id
1936                    ),
1937                    trigger.node_id.clone(),
1938                );
1939            }
1940        }
1941    }
1942}
1943
1944fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1945    if !matches!(
1946        bundle.environment.worktree_policy.as_str(),
1947        "reuse_current" | "new_worktree" | "host_managed"
1948    ) {
1949        push_error(
1950            report,
1951            "environment.worktree_policy",
1952            "worktree_policy must be reuse_current, new_worktree, or host_managed",
1953            None,
1954        );
1955    }
1956}
1957
1958fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1959    for prefix in [
1960        "node is unreachable: ",
1961        "edge.from references unknown node: ",
1962        "edge.to references unknown node: ",
1963        "entry node does not exist: ",
1964    ] {
1965        if let Some(node_id) = message.strip_prefix(prefix) {
1966            return Some(node_id.to_string());
1967        }
1968    }
1969    if let Some(rest) = message.strip_prefix("node ") {
1970        if let Some((node_id, _)) = rest.split_once(':') {
1971            return Some(node_id.to_string());
1972        }
1973    }
1974    graph
1975        .nodes
1976        .keys()
1977        .find(|node_id| message.contains(&format!("node {node_id}:")))
1978        .cloned()
1979}
1980
1981fn workflow_graph_id(node_id: &str) -> String {
1982    format!("node/{node_id}")
1983}
1984
1985fn trigger_graph_id(trigger_id: &str) -> String {
1986    format!("trigger/{trigger_id}")
1987}
1988
1989fn connector_graph_id(connector_id: &str) -> String {
1990    format!("connector/{connector_id}")
1991}
1992
1993fn catchup_graph_id() -> String {
1994    "policy/catchup".to_string()
1995}
1996
1997fn dlq_graph_id() -> String {
1998    "policy/dlq".to_string()
1999}
2000
2001fn terminal_completed_graph_id() -> String {
2002    "terminal/completed".to_string()
2003}
2004
2005fn terminal_failed_graph_id() -> String {
2006    "terminal/failed".to_string()
2007}
2008
2009fn workflow_node_type(kind: &str) -> String {
2010    match kind {
2011        "action" => "action",
2012        "stage" | "agent" => "agent",
2013        "subagent" | "worker" => "subagent",
2014        "wait" | "waitpoint" | "delay" => "wait",
2015        "approval" | "hitl" => "approval",
2016        "connector" | "connector_call" => "connector_call",
2017        "notification" | "notify" => "notification",
2018        "terminal" | "success" | "failure" => "terminal",
2019        other if other.trim().is_empty() => "agent",
2020        other => other,
2021    }
2022    .to_string()
2023}
2024
2025fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2026    node.task_label
2027        .clone()
2028        .or_else(|| node.prompt.clone())
2029        .map(|label| label.trim().to_string())
2030        .filter(|label| !label.is_empty())
2031        .unwrap_or_else(|| node_id.to_string())
2032}
2033
2034fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2035    if !trigger.events.is_empty() {
2036        format!("{}: {}", trigger.kind, trigger.events.join(", "))
2037    } else if let Some(schedule) = trigger.schedule.as_deref() {
2038        format!("cron: {schedule}")
2039    } else if let Some(delay) = trigger.delay.as_deref() {
2040        format!("delay: {delay}")
2041    } else {
2042        trigger.id.clone()
2043    }
2044}
2045
2046fn connector_label(connector: &ConnectorRequirement) -> String {
2047    if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2048        connector.id.clone()
2049    } else {
2050        format!("{} ({})", connector.id, connector.provider_id)
2051    }
2052}
2053
2054fn editable_field(
2055    id: impl Into<String>,
2056    label: impl Into<String>,
2057    json_pointer: impl Into<String>,
2058    value_type: impl Into<String>,
2059    required: bool,
2060    enum_values: &[&str],
2061) -> WorkflowBundleEditableField {
2062    WorkflowBundleEditableField {
2063        id: id.into(),
2064        label: label.into(),
2065        json_pointer: json_pointer.into(),
2066        value_type: value_type.into(),
2067        required,
2068        enum_values: enum_values
2069            .iter()
2070            .map(|value| (*value).to_string())
2071            .collect(),
2072    }
2073}
2074
2075fn json_pointer_segment(value: &str) -> String {
2076    value.replace('~', "~0").replace('/', "~1")
2077}
2078
2079fn trigger_editable_fields(
2080    index: usize,
2081    trigger: &WorkflowBundleTrigger,
2082) -> Vec<WorkflowBundleEditableField> {
2083    let base = format!("/triggers/{index}");
2084    let mut fields = vec![
2085        editable_field(
2086            format!("trigger.{}.kind", trigger.id),
2087            "Trigger kind",
2088            format!("{base}/kind"),
2089            "enum",
2090            true,
2091            &["github", "cron", "delay", "manual", "webhook", "mcp"],
2092        ),
2093        editable_field(
2094            format!("trigger.{}.node_id", trigger.id),
2095            "Target node",
2096            format!("{base}/node_id"),
2097            "string",
2098            false,
2099            &[],
2100        ),
2101    ];
2102    if trigger.provider.is_some() || trigger.kind == "github" {
2103        fields.push(editable_field(
2104            format!("trigger.{}.provider", trigger.id),
2105            "Provider",
2106            format!("{base}/provider"),
2107            "string",
2108            trigger.kind == "github",
2109            &[],
2110        ));
2111    }
2112    for (field, label, value_type) in [
2113        ("events", "Events", "list"),
2114        ("schedule", "Schedule", "string"),
2115        ("delay", "Delay", "string"),
2116        ("webhook_path", "Webhook path", "string"),
2117        ("mcp_tool", "MCP tool", "string"),
2118        ("resume_key", "Resume key", "string"),
2119        ("metadata", "Metadata", "object"),
2120    ] {
2121        fields.push(editable_field(
2122            format!("trigger.{}.{}", trigger.id, field),
2123            label,
2124            format!("{base}/{field}"),
2125            value_type,
2126            false,
2127            &[],
2128        ));
2129    }
2130    fields
2131}
2132
2133fn workflow_node_editable_fields(
2134    node_id: &str,
2135    capsule_id: Option<&String>,
2136) -> Vec<WorkflowBundleEditableField> {
2137    let escaped_node = json_pointer_segment(node_id);
2138    let mut fields = vec![
2139        editable_field(
2140            format!("workflow.{node_id}.task_label"),
2141            "Task label",
2142            format!("/workflow/nodes/{escaped_node}/task_label"),
2143            "string",
2144            false,
2145            &[],
2146        ),
2147        editable_field(
2148            format!("workflow.{node_id}.prompt"),
2149            "Prompt",
2150            format!("/workflow/nodes/{escaped_node}/prompt"),
2151            "string",
2152            false,
2153            &[],
2154        ),
2155        editable_field(
2156            format!("workflow.{node_id}.system"),
2157            "System prompt",
2158            format!("/workflow/nodes/{escaped_node}/system"),
2159            "string",
2160            false,
2161            &[],
2162        ),
2163        editable_field(
2164            format!("workflow.{node_id}.model_policy"),
2165            "Model policy",
2166            format!("/workflow/nodes/{escaped_node}/model_policy"),
2167            "object",
2168            false,
2169            &[],
2170        ),
2171        editable_field(
2172            format!("workflow.{node_id}.tools"),
2173            "Tool policy",
2174            format!("/workflow/nodes/{escaped_node}/tools"),
2175            "any",
2176            false,
2177            &[],
2178        ),
2179        editable_field(
2180            format!("workflow.{node_id}.capability_policy"),
2181            "Capability policy",
2182            format!("/workflow/nodes/{escaped_node}/capability_policy"),
2183            "object",
2184            false,
2185            &[],
2186        ),
2187        editable_field(
2188            format!("workflow.{node_id}.approval_policy"),
2189            "Approval policy",
2190            format!("/workflow/nodes/{escaped_node}/approval_policy"),
2191            "object",
2192            false,
2193            &[],
2194        ),
2195        editable_field(
2196            format!("workflow.{node_id}.retry_policy"),
2197            "Retry policy",
2198            format!("/workflow/nodes/{escaped_node}/retry_policy"),
2199            "object",
2200            false,
2201            &[],
2202        ),
2203    ];
2204    if let Some(capsule_id) = capsule_id {
2205        let escaped_capsule = json_pointer_segment(capsule_id);
2206        fields.extend([
2207            editable_field(
2208                format!("prompt_capsule.{capsule_id}.prompt"),
2209                "Prompt capsule",
2210                format!("/prompt_capsules/{escaped_capsule}/prompt"),
2211                "string",
2212                true,
2213                &[],
2214            ),
2215            editable_field(
2216                format!("prompt_capsule.{capsule_id}.system"),
2217                "Prompt capsule system",
2218                format!("/prompt_capsules/{escaped_capsule}/system"),
2219                "string",
2220                false,
2221                &[],
2222            ),
2223            editable_field(
2224                format!("prompt_capsule.{capsule_id}.context"),
2225                "Prompt capsule context",
2226                format!("/prompt_capsules/{escaped_capsule}/context"),
2227                "object",
2228                false,
2229                &[],
2230            ),
2231            editable_field(
2232                format!("prompt_capsule.{capsule_id}.trigger_id"),
2233                "Prompt capsule trigger",
2234                format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2235                "string",
2236                false,
2237                &[],
2238            ),
2239        ]);
2240    }
2241    fields
2242}
2243
2244fn connector_editable_fields(
2245    index: usize,
2246    connector: &ConnectorRequirement,
2247) -> Vec<WorkflowBundleEditableField> {
2248    let base = format!("/connectors/{index}");
2249    [
2250        ("id", "Connector id", "string", true),
2251        ("provider_id", "Provider id", "string", true),
2252        ("scopes", "Scopes", "list", false),
2253        ("setup_required", "Setup required", "bool", false),
2254        ("status_required", "Status required", "bool", false),
2255    ]
2256    .into_iter()
2257    .map(|(field, label, value_type, required)| {
2258        editable_field(
2259            format!("connector.{}.{}", connector.id, field),
2260            label,
2261            format!("{base}/{field}"),
2262            value_type,
2263            required,
2264            &[],
2265        )
2266    })
2267    .collect()
2268}
2269
2270fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2271    vec![
2272        editable_field(
2273            "policy.retry.max_attempts",
2274            "Retry attempts",
2275            "/policy/retry/max_attempts",
2276            "integer",
2277            true,
2278            &[],
2279        ),
2280        editable_field(
2281            "policy.retry.backoff",
2282            "Retry backoff",
2283            "/policy/retry/backoff",
2284            "string",
2285            true,
2286            &[],
2287        ),
2288    ]
2289}
2290
2291fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2292    vec![
2293        editable_field(
2294            "policy.catchup.mode",
2295            "Catchup mode",
2296            "/policy/catchup/mode",
2297            "enum",
2298            true,
2299            &["none", "latest", "all"],
2300        ),
2301        editable_field(
2302            "policy.catchup.max_events",
2303            "Catchup max events",
2304            "/policy/catchup/max_events",
2305            "integer",
2306            false,
2307            &[],
2308        ),
2309    ]
2310}
2311
2312fn render_workflow_bundle_mermaid(
2313    nodes: &[WorkflowBundleGraphNode],
2314    edges: &[WorkflowBundleGraphEdge],
2315) -> String {
2316    let mut lines = vec!["flowchart TD".to_string()];
2317    for node in nodes {
2318        lines.push(format!(
2319            "  {}[\"{}\"]",
2320            mermaid_id(&node.id),
2321            mermaid_label(&format!("{}: {}", node.node_type, node.label))
2322        ));
2323    }
2324    for edge in edges {
2325        let label = edge
2326            .label
2327            .as_deref()
2328            .or(edge.branch.as_deref())
2329            .map(mermaid_label);
2330        match label {
2331            Some(label) if !label.is_empty() => lines.push(format!(
2332                "  {} -->|{}| {}",
2333                mermaid_id(&edge.from),
2334                label,
2335                mermaid_id(&edge.to)
2336            )),
2337            _ => lines.push(format!(
2338                "  {} --> {}",
2339                mermaid_id(&edge.from),
2340                mermaid_id(&edge.to)
2341            )),
2342        }
2343    }
2344    lines.join("\n")
2345}
2346
2347fn mermaid_id(value: &str) -> String {
2348    let digest = Sha256::digest(value.as_bytes());
2349    let suffix = digest
2350        .iter()
2351        .take(4)
2352        .map(|byte| format!("{byte:02x}"))
2353        .collect::<String>();
2354    let mut out = format!("n_{suffix}_");
2355    for ch in value.chars() {
2356        if ch.is_ascii_alphanumeric() {
2357            out.push(ch);
2358        } else {
2359            out.push('_');
2360        }
2361    }
2362    out
2363}
2364
2365fn mermaid_label(value: &str) -> String {
2366    value
2367        .replace('\\', "\\\\")
2368        .replace('"', "\\\"")
2369        .replace('\n', " ")
2370}
2371
2372fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2373    let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2374    for trigger in &bundle.triggers {
2375        if let Some(node_id) = trigger.node_id.as_ref() {
2376            by_node
2377                .entry(node_id.clone())
2378                .or_default()
2379                .push(trigger.id.clone());
2380        }
2381    }
2382    by_node
2383}
2384
2385fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2386    bundle
2387        .prompt_capsules
2388        .iter()
2389        .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2390        .collect()
2391}
2392
2393fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2394    let outgoing =
2395        graph
2396            .edges
2397            .iter()
2398            .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2399                acc.entry(edge.from.clone())
2400                    .or_default()
2401                    .push(edge.to.clone());
2402                acc
2403            });
2404    let mut seen = BTreeSet::new();
2405    let mut queue = VecDeque::from([graph.entry.clone()]);
2406    let mut order = Vec::new();
2407    while let Some(node_id) = queue.pop_front() {
2408        if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2409            continue;
2410        }
2411        order.push(node_id.clone());
2412        if let Some(next) = outgoing.get(&node_id) {
2413            let mut next = next.clone();
2414            next.sort();
2415            for child in next {
2416                queue.push_back(child);
2417            }
2418        }
2419    }
2420    order
2421}
2422
2423fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2424    let suffix = graph_digest
2425        .strip_prefix("sha256:")
2426        .unwrap_or(graph_digest)
2427        .chars()
2428        .take(12)
2429        .collect::<String>();
2430    format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2431}
2432
2433fn sanitize_id(value: &str) -> String {
2434    value
2435        .chars()
2436        .map(|ch| {
2437            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2438                ch
2439            } else {
2440                '_'
2441            }
2442        })
2443        .collect()
2444}
2445
2446fn push_error(
2447    report: &mut WorkflowBundleValidationReport,
2448    path: impl Into<String>,
2449    message: impl Into<String>,
2450    node_id: Option<String>,
2451) {
2452    report.errors.push(WorkflowBundleDiagnostic {
2453        severity: "error".to_string(),
2454        path: path.into(),
2455        message: message.into(),
2456        node_id,
2457    });
2458}
2459
2460fn push_warning(
2461    report: &mut WorkflowBundleValidationReport,
2462    path: impl Into<String>,
2463    message: impl Into<String>,
2464    node_id: Option<String>,
2465) {
2466    report.warnings.push(WorkflowBundleDiagnostic {
2467        severity: "warning".to_string(),
2468        path: path.into(),
2469        message: message.into(),
2470        node_id,
2471    });
2472}
2473
2474#[cfg(test)]
2475mod tests;