Skip to main content

harn_vm/orchestration/
workflow_bundle.rs

1//! Portable workflow bundle contract, `.harnpack` container helpers, and
2//! deterministic local receipts.
3
4use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22/// Maximum decompressed size of a `.harnpack` archive. Matches the
23/// VM-level decompression cap in `stdlib/compression.rs`. Keeps a
24/// malicious bundle from exhausting memory during ingest (workflow
25/// bundles ride this exact path when harn-cloud accepts a plan
26/// transfer).
27const MAX_HARNPACK_DECOMPRESSED_BYTES: u64 = 100 * 1024 * 1024;
28
29/// Decompress zstd-compressed harnpack bytes, refusing to produce more
30/// than [`MAX_HARNPACK_DECOMPRESSED_BYTES`] of output. Returns
31/// [`WorkflowBundleErrorKind::InvalidArchive`] on overflow so callers
32/// can surface a clean rejection instead of OOMing the host.
33fn decompress_harnpack_zstd(bytes: &[u8]) -> Result<Vec<u8>, WorkflowBundleError> {
34    let mut decoder = zstd::stream::Decoder::new(Cursor::new(bytes))?;
35    let mut output = Vec::new();
36    let mut limited = (&mut decoder).take(MAX_HARNPACK_DECOMPRESSED_BYTES.saturating_add(1));
37    limited.read_to_end(&mut output)?;
38    if output.len() as u64 > MAX_HARNPACK_DECOMPRESSED_BYTES {
39        return Err(WorkflowBundleError::new(
40            WorkflowBundleErrorKind::InvalidArchive,
41            format!(
42                "harnpack zstd payload decompressed past max size ({} bytes); refusing to extract",
43                MAX_HARNPACK_DECOMPRESSED_BYTES
44            ),
45        ));
46    }
47    Ok(output)
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
51#[serde(default)]
52pub struct WorkflowBundle {
53    pub schema_version: u32,
54    pub entrypoint: PathBuf,
55    pub transitive_modules: Vec<ModuleEntry>,
56    pub stdlib_version: String,
57    pub harn_version: String,
58    pub provider_catalog_hash: String,
59    pub tool_manifest: Vec<ToolEntry>,
60    pub sbom: SBOMDoc,
61    pub signature: Option<Ed25519Signature>,
62    pub parent_trust_record_id: Option<String>,
63    pub id: String,
64    pub name: Option<String>,
65    pub version: String,
66    pub triggers: Vec<WorkflowBundleTrigger>,
67    pub workflow: WorkflowGraph,
68    pub prompt_capsules: BTreeMap<String, PromptCapsule>,
69    pub policy: WorkflowBundlePolicy,
70    pub connectors: Vec<ConnectorRequirement>,
71    pub environment: EnvironmentRequirements,
72    pub receipts: WorkflowBundleReplayMetadata,
73    pub metadata: BTreeMap<String, serde_json::Value>,
74}
75
76impl Default for WorkflowBundle {
77    fn default() -> Self {
78        Self {
79            schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
80            entrypoint: PathBuf::new(),
81            transitive_modules: Vec::new(),
82            stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
83            harn_version: env!("CARGO_PKG_VERSION").to_string(),
84            provider_catalog_hash: String::new(),
85            tool_manifest: Vec::new(),
86            sbom: SBOMDoc::default(),
87            signature: None,
88            parent_trust_record_id: None,
89            id: String::new(),
90            name: None,
91            version: String::new(),
92            triggers: Vec::new(),
93            workflow: WorkflowGraph::default(),
94            prompt_capsules: BTreeMap::new(),
95            policy: WorkflowBundlePolicy::default(),
96            connectors: Vec::new(),
97            environment: EnvironmentRequirements::default(),
98            receipts: WorkflowBundleReplayMetadata::default(),
99            metadata: BTreeMap::new(),
100        }
101    }
102}
103
104#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
105#[serde(default)]
106pub struct ModuleEntry {
107    pub path: PathBuf,
108    pub source_hash_blake3: String,
109    pub harnbc_hash_blake3: String,
110}
111
112#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
113#[serde(default)]
114pub struct ToolEntry {
115    pub name: String,
116    pub provider: Option<String>,
117    pub annotations: Option<ToolAnnotations>,
118    pub schema_hash_blake3: Option<String>,
119    pub metadata: BTreeMap<String, String>,
120}
121
122#[allow(clippy::upper_case_acronyms)]
123#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct SBOMDoc {
126    pub format: String,
127    pub version: String,
128    pub packages: Vec<SBOMPackage>,
129    pub relationships: Vec<SBOMRelationship>,
130}
131
132#[allow(clippy::upper_case_acronyms)]
133#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
134#[serde(default)]
135pub struct SBOMPackage {
136    pub name: String,
137    pub version: Option<String>,
138    pub package_hash_blake3: Option<String>,
139    pub license: Option<String>,
140}
141
142#[allow(clippy::upper_case_acronyms)]
143#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
144#[serde(default)]
145pub struct SBOMRelationship {
146    pub from: String,
147    pub to: String,
148    pub relationship_type: String,
149}
150
151#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
152#[serde(default)]
153pub struct Ed25519Signature {
154    pub key_id: Option<String>,
155    pub public_key: String,
156    pub signature: String,
157    pub manifest_hash_blake3: String,
158    pub algorithm: String,
159}
160
161#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(default)]
163pub struct WorkflowBundleTrigger {
164    pub id: String,
165    pub kind: String,
166    pub provider: Option<String>,
167    pub events: Vec<String>,
168    pub schedule: Option<String>,
169    pub delay: Option<String>,
170    pub webhook_path: Option<String>,
171    pub mcp_tool: Option<String>,
172    pub resume_key: Option<String>,
173    pub node_id: Option<String>,
174    pub metadata: BTreeMap<String, String>,
175}
176
177#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
178#[serde(default)]
179pub struct PromptCapsule {
180    pub id: String,
181    pub node_id: String,
182    pub trigger_id: Option<String>,
183    pub prompt: String,
184    pub system: Option<String>,
185    pub context: BTreeMap<String, serde_json::Value>,
186}
187
188#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
189#[serde(default)]
190pub struct WorkflowBundlePolicy {
191    pub autonomy_tier: String,
192    pub tool_policy: BTreeMap<String, serde_json::Value>,
193    pub approval_required: Vec<String>,
194    pub retry: RetryPolicySpec,
195    pub catchup: CatchupPolicySpec,
196}
197
198impl Default for WorkflowBundlePolicy {
199    fn default() -> Self {
200        Self {
201            autonomy_tier: "act_with_approval".to_string(),
202            tool_policy: BTreeMap::new(),
203            approval_required: Vec::new(),
204            retry: RetryPolicySpec::default(),
205            catchup: CatchupPolicySpec::default(),
206        }
207    }
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
211#[serde(default)]
212pub struct RetryPolicySpec {
213    pub max_attempts: u32,
214    pub backoff: String,
215}
216
217impl Default for RetryPolicySpec {
218    fn default() -> Self {
219        Self {
220            max_attempts: 1,
221            backoff: "none".to_string(),
222        }
223    }
224}
225
226#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
227#[serde(default)]
228pub struct CatchupPolicySpec {
229    pub mode: String,
230    pub max_events: Option<u32>,
231}
232
233impl Default for CatchupPolicySpec {
234    fn default() -> Self {
235        Self {
236            mode: "latest".to_string(),
237            max_events: Some(1),
238        }
239    }
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
243#[serde(default)]
244pub struct ConnectorRequirement {
245    pub id: String,
246    pub provider_id: String,
247    pub scopes: Vec<String>,
248    pub setup_required: bool,
249    pub status_required: bool,
250}
251
252#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
253#[serde(default)]
254pub struct EnvironmentRequirements {
255    pub repo_setup_profile: Option<String>,
256    pub worktree_policy: String,
257    pub command_gates: Vec<String>,
258}
259
260impl Default for EnvironmentRequirements {
261    fn default() -> Self {
262        Self {
263            repo_setup_profile: None,
264            worktree_policy: "host_managed".to_string(),
265            command_gates: Vec::new(),
266        }
267    }
268}
269
270#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
271#[serde(default)]
272pub struct WorkflowBundleReplayMetadata {
273    pub run_id: Option<String>,
274    pub event_ids: Vec<String>,
275    pub workflow_version: Option<usize>,
276    pub graph_digest: Option<String>,
277}
278
279#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
280#[serde(default)]
281pub struct WorkflowBundleDiagnostic {
282    pub severity: String,
283    pub path: String,
284    pub message: String,
285    pub node_id: Option<String>,
286}
287
288#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
289#[serde(default)]
290pub struct WorkflowBundleValidationReport {
291    pub valid: bool,
292    pub bundle_id: String,
293    pub workflow_id: String,
294    pub graph_digest: String,
295    pub errors: Vec<WorkflowBundleDiagnostic>,
296    pub warnings: Vec<WorkflowBundleDiagnostic>,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
300pub struct WorkflowBundlePreview {
301    pub schema_version: u32,
302    pub bundle_id: String,
303    pub bundle_version: String,
304    pub workflow_id: String,
305    pub workflow_version: usize,
306    pub graph_digest: String,
307    pub validation: WorkflowBundleValidationReport,
308    pub graph: WorkflowBundleGraphExport,
309    pub mermaid: String,
310    pub triggers: Vec<WorkflowBundleTrigger>,
311    pub connectors: Vec<ConnectorRequirement>,
312    pub environment: EnvironmentRequirements,
313    pub nodes: Vec<WorkflowBundlePreviewNode>,
314    pub edges: Vec<WorkflowEdge>,
315}
316
317#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
318pub struct WorkflowBundlePreviewNode {
319    pub id: String,
320    pub kind: String,
321    pub label: Option<String>,
322    pub prompt_capsule: Option<String>,
323    pub trigger_ids: Vec<String>,
324    pub outgoing: Vec<String>,
325}
326
327#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
328pub struct WorkflowBundleGraphExport {
329    pub schema_version: u32,
330    pub graph_id: String,
331    pub graph_digest: String,
332    pub nodes: Vec<WorkflowBundleGraphNode>,
333    pub edges: Vec<WorkflowBundleGraphEdge>,
334    pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
335    pub editable_fields: Vec<WorkflowBundleEditableField>,
336    pub mermaid: String,
337}
338
339#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
340pub struct WorkflowBundleGraphNode {
341    pub id: String,
342    pub node_type: String,
343    pub label: String,
344    pub workflow_node_id: Option<String>,
345    pub trigger_id: Option<String>,
346    pub connector_id: Option<String>,
347    pub editable_fields: Vec<WorkflowBundleEditableField>,
348    pub metadata: BTreeMap<String, serde_json::Value>,
349}
350
351#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
352pub struct WorkflowBundleGraphEdge {
353    pub from: String,
354    pub to: String,
355    pub label: Option<String>,
356    pub branch: Option<String>,
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
360pub struct WorkflowBundleGraphDiagnostic {
361    pub severity: String,
362    pub path: String,
363    pub message: String,
364    pub node_id: Option<String>,
365    pub graph_node_id: Option<String>,
366}
367
368#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
369pub struct WorkflowBundleEditableField {
370    pub id: String,
371    pub label: String,
372    pub json_pointer: String,
373    pub value_type: String,
374    pub required: bool,
375    pub enum_values: Vec<String>,
376}
377
378#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
379#[serde(default)]
380pub struct WorkflowBundleRunRequest {
381    pub trigger_id: Option<String>,
382    pub event_id: Option<String>,
383}
384
385#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
386pub struct WorkflowBundleRunReceipt {
387    pub schema_version: u32,
388    pub receipt_type: String,
389    pub bundle_id: String,
390    pub bundle_version: String,
391    pub workflow_id: String,
392    pub workflow_version: usize,
393    pub graph_digest: String,
394    pub run_id: String,
395    pub trigger_id: Option<String>,
396    pub event_ids: Vec<String>,
397    pub status: String,
398    pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
399    pub policy: WorkflowBundlePolicy,
400    pub connectors: Vec<ConnectorRequirement>,
401    pub environment: EnvironmentRequirements,
402}
403
404#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
405pub struct WorkflowBundleRunNodeReceipt {
406    pub node_id: String,
407    pub kind: String,
408    pub prompt_capsule: Option<String>,
409    pub status: String,
410}
411
412#[derive(Clone, Debug, PartialEq, Eq)]
413pub struct HarnpackEntry {
414    pub path: PathBuf,
415    pub bytes: Vec<u8>,
416    pub mode: u32,
417}
418
419impl HarnpackEntry {
420    pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
421        Self {
422            path: path.into(),
423            bytes: bytes.into(),
424            mode: DEFAULT_HARNPACK_FILE_MODE,
425        }
426    }
427
428    pub fn with_mode(mut self, mode: u32) -> Self {
429        self.mode = mode;
430        self
431    }
432}
433
434#[derive(Clone, Debug, PartialEq)]
435pub struct HarnpackArchive {
436    pub manifest: WorkflowBundle,
437    pub contents: Vec<HarnpackEntry>,
438}
439
440#[derive(Clone, Debug, PartialEq, Eq)]
441pub enum WorkflowBundleErrorKind {
442    Io,
443    Json,
444    MissingSchemaVersion,
445    UnsupportedSchemaVersion { actual: u32, expected: u32 },
446    InvalidArchive,
447    DuplicateArchiveEntry,
448    UnsafeArchivePath,
449    InvalidSignature,
450}
451
452#[derive(Clone, Debug, PartialEq, Eq)]
453pub struct WorkflowBundleError {
454    pub kind: WorkflowBundleErrorKind,
455    pub message: String,
456}
457
458impl WorkflowBundleError {
459    fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
460        Self {
461            kind,
462            message: message.into(),
463        }
464    }
465
466    fn unsupported_schema_version(actual: u32) -> Self {
467        Self::new(
468            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
469                actual,
470                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
471            },
472            format!(
473                "unsupported workflow bundle schema_version {actual}; expected {}",
474                WORKFLOW_BUNDLE_SCHEMA_VERSION
475            ),
476        )
477    }
478}
479
480impl std::fmt::Display for WorkflowBundleError {
481    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482        self.message.fmt(f)
483    }
484}
485
486impl std::error::Error for WorkflowBundleError {}
487
488impl From<std::io::Error> for WorkflowBundleError {
489    fn from(error: std::io::Error) -> Self {
490        Self::new(WorkflowBundleErrorKind::Io, error.to_string())
491    }
492}
493
494impl From<serde_json::Error> for WorkflowBundleError {
495    fn from(error: serde_json::Error) -> Self {
496        Self::new(WorkflowBundleErrorKind::Json, error.to_string())
497    }
498}
499
500pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
501    let bytes = fs::read(path)?;
502    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
503        read_harnpack(&bytes).map(|archive| archive.manifest)
504    } else {
505        parse_workflow_bundle_manifest(&bytes)
506    }
507}
508
509/// Read a manifest from any prior or current schema version without
510/// rejecting on a `schema_version` mismatch. Used by `harn pack
511/// --upgrade` to read v1 bundles before re-emitting them under the v2
512/// shape. Fields the older schema didn't carry deserialize to their
513/// type defaults via `#[serde(default)]`.
514pub fn read_workflow_bundle_manifest_any_version(
515    bytes: &[u8],
516) -> Result<WorkflowBundle, WorkflowBundleError> {
517    let value: serde_json::Value = serde_json::from_slice(bytes)?;
518    if value.get("schema_version").is_none() {
519        return Err(WorkflowBundleError::new(
520            WorkflowBundleErrorKind::MissingSchemaVersion,
521            "workflow bundle manifest is missing schema_version",
522        ));
523    }
524    serde_json::from_value(value).map_err(Into::into)
525}
526
527/// Variant of [`load_workflow_bundle`] that accepts any historical
528/// schema version. Reads the manifest from a `.harnpack` archive or a
529/// bare JSON manifest as appropriate.
530pub fn load_workflow_bundle_any_version(
531    path: &Path,
532) -> Result<WorkflowBundle, WorkflowBundleError> {
533    let bytes = fs::read(path)?;
534    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
535        let tar_bytes = decompress_harnpack_zstd(&bytes)?;
536        let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
537        for entry in archive.entries()? {
538            let mut entry = entry?;
539            if entry.header().entry_type().is_dir() {
540                continue;
541            }
542            let path = normalize_archive_path(entry.path()?.as_ref())?;
543            if path == HARNPACK_MANIFEST_PATH {
544                let mut entry_bytes = Vec::new();
545                entry.read_to_end(&mut entry_bytes)?;
546                return read_workflow_bundle_manifest_any_version(&entry_bytes);
547            }
548        }
549        Err(WorkflowBundleError::new(
550            WorkflowBundleErrorKind::InvalidArchive,
551            format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
552        ))
553    } else {
554        read_workflow_bundle_manifest_any_version(&bytes)
555    }
556}
557
558pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
559    let value: serde_json::Value = serde_json::from_slice(bytes)?;
560    let schema_version = value
561        .get("schema_version")
562        .and_then(serde_json::Value::as_u64)
563        .ok_or_else(|| {
564            WorkflowBundleError::new(
565                WorkflowBundleErrorKind::MissingSchemaVersion,
566                "workflow bundle manifest is missing numeric schema_version",
567            )
568        })?;
569    let actual = u32::try_from(schema_version).map_err(|_| {
570        WorkflowBundleError::new(
571            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
572                actual: u32::MAX,
573                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
574            },
575            format!(
576                "unsupported workflow bundle schema_version {schema_version}; expected {}",
577                WORKFLOW_BUNDLE_SCHEMA_VERSION
578            ),
579        )
580    })?;
581    if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
582        return Err(WorkflowBundleError::unsupported_schema_version(actual));
583    }
584    serde_json::from_value(value).map_err(Into::into)
585}
586
587pub fn canonical_workflow_bundle_manifest_bytes(
588    bundle: &WorkflowBundle,
589) -> Result<Vec<u8>, WorkflowBundleError> {
590    serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
591}
592
593pub fn workflow_bundle_hash(
594    bundle: &WorkflowBundle,
595    contents: &[HarnpackEntry],
596) -> Result<String, WorkflowBundleError> {
597    let mut hasher = blake3::Hasher::new();
598    let mut canonical = canonical_workflow_bundle_manifest(bundle);
599    canonical.signature = None;
600    let manifest_bytes = serde_json::to_vec(&canonical)?;
601    hasher.update(&manifest_bytes);
602
603    let mut content_hashes = contents
604        .iter()
605        .map(|entry| blake3_hash_bytes(&entry.bytes))
606        .collect::<Vec<_>>();
607    content_hashes.sort();
608    for content_hash in content_hashes {
609        hasher.update(b"\n");
610        hasher.update(content_hash.as_bytes());
611    }
612
613    Ok(blake3_digest_string(hasher.finalize()))
614}
615
616pub fn verify_workflow_bundle_signature(
617    bundle: &WorkflowBundle,
618    contents: &[HarnpackEntry],
619) -> Result<(), WorkflowBundleError> {
620    let signature = bundle.signature.as_ref().ok_or_else(|| {
621        WorkflowBundleError::new(
622            WorkflowBundleErrorKind::InvalidSignature,
623            "workflow bundle is unsigned",
624        )
625    })?;
626    if signature.algorithm != "ed25519" {
627        return Err(WorkflowBundleError::new(
628            WorkflowBundleErrorKind::InvalidSignature,
629            format!(
630                "unsupported workflow bundle signature algorithm {}",
631                signature.algorithm
632            ),
633        ));
634    }
635    let expected_hash = workflow_bundle_hash(bundle, contents)?;
636    if signature.manifest_hash_blake3 != expected_hash {
637        return Err(WorkflowBundleError::new(
638            WorkflowBundleErrorKind::InvalidSignature,
639            format!(
640                "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
641                signature.manifest_hash_blake3
642            ),
643        ));
644    }
645    let public_key_bytes = decode_hex_exact::<32>(
646        "workflow bundle signature public_key",
647        &signature.public_key,
648    )?;
649    let signature_bytes =
650        decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
651    let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
652        WorkflowBundleError::new(
653            WorkflowBundleErrorKind::InvalidSignature,
654            format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
655        )
656    })?;
657    let ed25519_signature = Signature::from_bytes(&signature_bytes);
658    verifying_key
659        .verify(expected_hash.as_bytes(), &ed25519_signature)
660        .map_err(|error| {
661            WorkflowBundleError::new(
662                WorkflowBundleErrorKind::InvalidSignature,
663                format!("workflow bundle signature failed Ed25519 verification: {error}"),
664            )
665        })
666}
667
668pub fn build_harnpack(
669    bundle: &WorkflowBundle,
670    contents: &[HarnpackEntry],
671) -> Result<Vec<u8>, WorkflowBundleError> {
672    let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
673    let mut entries = contents
674        .iter()
675        .map(|entry| {
676            normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
677        })
678        .collect::<Result<Vec<_>, _>>()?;
679    entries.sort_by(|left, right| left.0.cmp(&right.0));
680
681    let mut seen = BTreeSet::new();
682    for (path, _, _) in &entries {
683        if path == HARNPACK_MANIFEST_PATH {
684            return Err(WorkflowBundleError::new(
685                WorkflowBundleErrorKind::DuplicateArchiveEntry,
686                format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
687            ));
688        }
689        if !seen.insert(path.clone()) {
690            return Err(WorkflowBundleError::new(
691                WorkflowBundleErrorKind::DuplicateArchiveEntry,
692                format!("duplicate archive entry: {path}"),
693            ));
694        }
695    }
696
697    let mut tar_bytes = Vec::new();
698    {
699        let mut builder = tar::Builder::new(&mut tar_bytes);
700        append_harnpack_entry(
701            &mut builder,
702            HARNPACK_MANIFEST_PATH,
703            &manifest_bytes,
704            DEFAULT_HARNPACK_FILE_MODE,
705        )?;
706        for (path, bytes, mode) in entries {
707            append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
708        }
709        builder.finish()?;
710    }
711
712    zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
713}
714
715pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
716    let tar_bytes = decompress_harnpack_zstd(bytes)?;
717    let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
718    let mut manifest = None;
719    let mut contents = Vec::new();
720    let mut seen = BTreeSet::new();
721
722    for entry in archive.entries()? {
723        let mut entry = entry?;
724        if entry.header().entry_type().is_dir() {
725            continue;
726        }
727        let path = normalize_archive_path(entry.path()?.as_ref())?;
728        if !seen.insert(path.clone()) {
729            return Err(WorkflowBundleError::new(
730                WorkflowBundleErrorKind::DuplicateArchiveEntry,
731                format!("duplicate archive entry: {path}"),
732            ));
733        }
734        let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
735        let mut entry_bytes = Vec::new();
736        entry.read_to_end(&mut entry_bytes)?;
737
738        if path == HARNPACK_MANIFEST_PATH {
739            manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
740        } else {
741            contents.push(HarnpackEntry {
742                path: PathBuf::from(path),
743                bytes: entry_bytes,
744                mode,
745            });
746        }
747    }
748
749    contents.sort_by(|left, right| left.path.cmp(&right.path));
750    Ok(HarnpackArchive {
751        manifest: manifest.ok_or_else(|| {
752            WorkflowBundleError::new(
753                WorkflowBundleErrorKind::InvalidArchive,
754                format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
755            )
756        })?,
757        contents,
758    })
759}
760
761pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
762    let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
763    Ok(blake3_hash_bytes(&bytes))
764}
765
766pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
767    let mut canonical = canonical_workflow_graph(graph);
768    canonical.audit_log.clear();
769    let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
770    let digest = Sha256::digest(bytes);
771    let hex = digest
772        .iter()
773        .map(|byte| format!("{byte:02x}"))
774        .collect::<String>();
775    format!("sha256:{hex}")
776}
777
778fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
779    let mut canonical = bundle.clone();
780    canonical.workflow = canonical_workflow_graph(&bundle.workflow);
781    canonical.transitive_modules.sort_by(|left, right| {
782        (
783            path_sort_key(&left.path),
784            &left.source_hash_blake3,
785            &left.harnbc_hash_blake3,
786        )
787            .cmp(&(
788                path_sort_key(&right.path),
789                &right.source_hash_blake3,
790                &right.harnbc_hash_blake3,
791            ))
792    });
793    canonical.tool_manifest.sort_by(|left, right| {
794        (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
795            &right.name,
796            &right.provider,
797            &right.schema_hash_blake3,
798        ))
799    });
800    canonical.sbom.packages.sort_by(|left, right| {
801        (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
802            &right.name,
803            &right.version,
804            &right.package_hash_blake3,
805        ))
806    });
807    canonical.sbom.relationships.sort_by(|left, right| {
808        (&left.from, &left.to, &left.relationship_type).cmp(&(
809            &right.from,
810            &right.to,
811            &right.relationship_type,
812        ))
813    });
814    canonical
815}
816
817fn decode_hex_exact<const N: usize>(
818    label: &str,
819    value: &str,
820) -> Result<[u8; N], WorkflowBundleError> {
821    let bytes = hex::decode(value).map_err(|error| {
822        WorkflowBundleError::new(
823            WorkflowBundleErrorKind::InvalidSignature,
824            format!("{label} is not hex: {error}"),
825        )
826    })?;
827    bytes.try_into().map_err(|bytes: Vec<u8>| {
828        WorkflowBundleError::new(
829            WorkflowBundleErrorKind::InvalidSignature,
830            format!("{label} must decode to {N} bytes, got {}", bytes.len()),
831        )
832    })
833}
834
835fn append_harnpack_entry<W: std::io::Write>(
836    builder: &mut tar::Builder<W>,
837    path: &str,
838    bytes: &[u8],
839    mode: u32,
840) -> Result<(), WorkflowBundleError> {
841    let mut header = tar::Header::new_gnu();
842    header.set_path(path).map_err(|error| {
843        WorkflowBundleError::new(
844            WorkflowBundleErrorKind::UnsafeArchivePath,
845            format!("invalid archive path {path}: {error}"),
846        )
847    })?;
848    header.set_entry_type(tar::EntryType::Regular);
849    header.set_size(bytes.len() as u64);
850    header.set_mode(mode);
851    header.set_mtime(0);
852    header.set_uid(0);
853    header.set_gid(0);
854    header.set_cksum();
855    builder.append(&header, bytes)?;
856    Ok(())
857}
858
859fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
860    let mut parts = Vec::new();
861    for component in path.components() {
862        match component {
863            Component::Normal(part) => {
864                let Some(part) = part.to_str() else {
865                    return Err(WorkflowBundleError::new(
866                        WorkflowBundleErrorKind::UnsafeArchivePath,
867                        format!("archive path is not valid UTF-8: {}", path.display()),
868                    ));
869                };
870                if part.is_empty() {
871                    return Err(WorkflowBundleError::new(
872                        WorkflowBundleErrorKind::UnsafeArchivePath,
873                        "archive path contains an empty component",
874                    ));
875                }
876                parts.push(part.to_string());
877            }
878            Component::CurDir => {}
879            Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
880                return Err(WorkflowBundleError::new(
881                    WorkflowBundleErrorKind::UnsafeArchivePath,
882                    format!(
883                        "archive path must be relative and contained: {}",
884                        path.display()
885                    ),
886                ));
887            }
888        }
889    }
890    if parts.is_empty() {
891        return Err(WorkflowBundleError::new(
892            WorkflowBundleErrorKind::UnsafeArchivePath,
893            "archive path is empty",
894        ));
895    }
896    Ok(parts.join("/"))
897}
898
899fn path_sort_key(path: &Path) -> String {
900    path.components()
901        .filter_map(|component| match component {
902            Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
903            _ => None,
904        })
905        .collect::<Vec<_>>()
906        .join("/")
907}
908
909fn blake3_hash_bytes(bytes: &[u8]) -> String {
910    blake3_digest_string(blake3::hash(bytes))
911}
912
913fn blake3_digest_string(hash: blake3::Hash) -> String {
914    format!("blake3:{hash}")
915}
916
917pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
918    let canonical = canonical_workflow_graph(&bundle.workflow);
919    let mut report = WorkflowBundleValidationReport {
920        valid: true,
921        bundle_id: bundle.id.clone(),
922        workflow_id: canonical.id.clone(),
923        graph_digest: workflow_graph_digest(&canonical),
924        errors: Vec::new(),
925        warnings: Vec::new(),
926    };
927
928    validate_manifest_contract(bundle, &mut report);
929    validate_bundle_identity(bundle, &canonical, &mut report);
930    validate_triggers(bundle, &canonical, &mut report);
931    validate_prompt_capsules(bundle, &canonical, &mut report);
932    validate_policy(bundle, &mut report);
933    validate_connectors(bundle, &mut report);
934    validate_environment(bundle, &mut report);
935
936    let graph_report = validate_workflow(&canonical, None);
937    for error in graph_report.errors {
938        let node_id = workflow_diagnostic_node_id(&error, &canonical);
939        push_error(&mut report, "workflow", error, node_id);
940    }
941    for warning in graph_report.warnings {
942        let node_id = workflow_diagnostic_node_id(&warning, &canonical);
943        push_warning(&mut report, "workflow", warning, node_id);
944    }
945
946    if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
947        if expected != report.graph_digest {
948            let actual = report.graph_digest.clone();
949            push_error(
950                &mut report,
951                "receipts.graph_digest",
952                format!("graph digest mismatch: expected {expected}, computed {actual}"),
953                None,
954            );
955        }
956    }
957    if let Some(expected_version) = bundle.receipts.workflow_version {
958        if expected_version != canonical.version {
959            push_error(
960                &mut report,
961                "receipts.workflow_version",
962                format!(
963                    "workflow version mismatch: expected {expected_version}, computed {}",
964                    canonical.version
965                ),
966                None,
967            );
968        }
969    }
970
971    report.valid = report.errors.is_empty();
972    report
973}
974
975pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
976    let canonical = canonical_workflow_graph(&bundle.workflow);
977    let validation = validate_workflow_bundle(bundle);
978    let graph = export_workflow_bundle_graph(bundle, &validation);
979    let mermaid = graph.mermaid.clone();
980    let triggers_by_node = triggers_by_node(bundle);
981    let capsules_by_node = capsules_by_node(bundle);
982    let mut nodes = Vec::new();
983
984    for (node_id, node) in &canonical.nodes {
985        let mut outgoing = canonical
986            .edges
987            .iter()
988            .filter(|edge| edge.from == *node_id)
989            .map(|edge| edge.to.clone())
990            .collect::<Vec<_>>();
991        outgoing.sort();
992        outgoing.dedup();
993        nodes.push(WorkflowBundlePreviewNode {
994            id: node_id.clone(),
995            kind: node.kind.clone(),
996            label: node.task_label.clone(),
997            prompt_capsule: capsules_by_node.get(node_id).cloned(),
998            trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
999            outgoing,
1000        });
1001    }
1002
1003    WorkflowBundlePreview {
1004        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1005        bundle_id: bundle.id.clone(),
1006        bundle_version: bundle.version.clone(),
1007        workflow_id: canonical.id.clone(),
1008        workflow_version: canonical.version,
1009        graph_digest: validation.graph_digest.clone(),
1010        validation,
1011        graph,
1012        mermaid,
1013        triggers: bundle.triggers.clone(),
1014        connectors: bundle.connectors.clone(),
1015        environment: bundle.environment.clone(),
1016        nodes,
1017        edges: sorted_edges(&canonical),
1018    }
1019}
1020
1021pub fn export_workflow_bundle_graph(
1022    bundle: &WorkflowBundle,
1023    validation: &WorkflowBundleValidationReport,
1024) -> WorkflowBundleGraphExport {
1025    let canonical = canonical_workflow_graph(&bundle.workflow);
1026    let mut nodes = Vec::new();
1027    let mut edges = Vec::new();
1028    let mut editable_fields = Vec::new();
1029    let capsules_by_node = capsules_by_node(bundle);
1030    let catchup_enabled = bundle.policy.catchup.mode != "none";
1031    let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1032
1033    for (index, connector) in bundle.connectors.iter().enumerate() {
1034        let node_fields = connector_editable_fields(index, connector);
1035        editable_fields.extend(node_fields.clone());
1036        nodes.push(WorkflowBundleGraphNode {
1037            id: connector_graph_id(&connector.id),
1038            node_type: "connector_call".to_string(),
1039            label: connector_label(connector),
1040            workflow_node_id: None,
1041            trigger_id: None,
1042            connector_id: Some(connector.id.clone()),
1043            editable_fields: node_fields,
1044            metadata: BTreeMap::from([
1045                (
1046                    "provider_id".to_string(),
1047                    serde_json::json!(connector.provider_id),
1048                ),
1049                ("scopes".to_string(), serde_json::json!(connector.scopes)),
1050            ]),
1051        });
1052    }
1053
1054    let catchup_fields = catchup_editable_fields();
1055    let retry_fields = retry_editable_fields();
1056    if catchup_enabled {
1057        let node_fields = catchup_fields.clone();
1058        editable_fields.extend(node_fields.clone());
1059        nodes.push(WorkflowBundleGraphNode {
1060            id: catchup_graph_id(),
1061            node_type: "catchup".to_string(),
1062            label: "Catch up".to_string(),
1063            workflow_node_id: None,
1064            trigger_id: None,
1065            connector_id: None,
1066            editable_fields: node_fields,
1067            metadata: BTreeMap::from([(
1068                "mode".to_string(),
1069                serde_json::json!(bundle.policy.catchup.mode),
1070            )]),
1071        });
1072    } else {
1073        editable_fields.extend(catchup_fields);
1074    }
1075    if retry_can_dlq {
1076        let node_fields = retry_fields.clone();
1077        editable_fields.extend(node_fields.clone());
1078        nodes.push(WorkflowBundleGraphNode {
1079            id: dlq_graph_id(),
1080            node_type: "dlq".to_string(),
1081            label: "Dead letter queue".to_string(),
1082            workflow_node_id: None,
1083            trigger_id: None,
1084            connector_id: None,
1085            editable_fields: node_fields,
1086            metadata: BTreeMap::from([(
1087                "max_attempts".to_string(),
1088                serde_json::json!(bundle.policy.retry.max_attempts),
1089            )]),
1090        });
1091    } else {
1092        editable_fields.extend(retry_fields);
1093    }
1094
1095    for (index, trigger) in bundle.triggers.iter().enumerate() {
1096        let node_fields = trigger_editable_fields(index, trigger);
1097        editable_fields.extend(node_fields.clone());
1098        nodes.push(WorkflowBundleGraphNode {
1099            id: trigger_graph_id(&trigger.id),
1100            node_type: "trigger".to_string(),
1101            label: trigger_label(trigger),
1102            workflow_node_id: trigger.node_id.clone(),
1103            trigger_id: Some(trigger.id.clone()),
1104            connector_id: None,
1105            editable_fields: node_fields,
1106            metadata: BTreeMap::from([
1107                ("kind".to_string(), serde_json::json!(trigger.kind)),
1108                ("provider".to_string(), serde_json::json!(trigger.provider)),
1109                ("events".to_string(), serde_json::json!(trigger.events)),
1110            ]),
1111        });
1112        if let Some(provider) = trigger.provider.as_deref() {
1113            if let Some(connector) = bundle
1114                .connectors
1115                .iter()
1116                .find(|connector| connector.provider_id == provider || connector.id == provider)
1117            {
1118                edges.push(WorkflowBundleGraphEdge {
1119                    from: connector_graph_id(&connector.id),
1120                    to: trigger_graph_id(&trigger.id),
1121                    label: Some("binds".to_string()),
1122                    branch: None,
1123                });
1124            }
1125        }
1126        let target = trigger
1127            .node_id
1128            .clone()
1129            .unwrap_or_else(|| canonical.entry.clone());
1130        if catchup_enabled {
1131            edges.push(WorkflowBundleGraphEdge {
1132                from: trigger_graph_id(&trigger.id),
1133                to: catchup_graph_id(),
1134                label: Some(bundle.policy.catchup.mode.clone()),
1135                branch: Some("catchup".to_string()),
1136            });
1137            edges.push(WorkflowBundleGraphEdge {
1138                from: catchup_graph_id(),
1139                to: workflow_graph_id(&target),
1140                label: Some("dispatch".to_string()),
1141                branch: None,
1142            });
1143        } else {
1144            edges.push(WorkflowBundleGraphEdge {
1145                from: trigger_graph_id(&trigger.id),
1146                to: workflow_graph_id(&target),
1147                label: Some("dispatch".to_string()),
1148                branch: None,
1149            });
1150        }
1151    }
1152
1153    for (node_id, node) in &canonical.nodes {
1154        let capsule_id = capsules_by_node.get(node_id);
1155        let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1156        editable_fields.extend(node_fields.clone());
1157        nodes.push(WorkflowBundleGraphNode {
1158            id: workflow_graph_id(node_id),
1159            node_type: workflow_node_type(&node.kind),
1160            label: workflow_node_label(node_id, node),
1161            workflow_node_id: Some(node_id.clone()),
1162            trigger_id: None,
1163            connector_id: None,
1164            editable_fields: node_fields,
1165            metadata: BTreeMap::from([
1166                ("kind".to_string(), serde_json::json!(node.kind)),
1167                ("task_label".to_string(), serde_json::json!(node.task_label)),
1168                (
1169                    "prompt_capsule".to_string(),
1170                    serde_json::json!(capsule_id.cloned()),
1171                ),
1172            ]),
1173        });
1174    }
1175
1176    for edge in sorted_edges(&canonical) {
1177        edges.push(WorkflowBundleGraphEdge {
1178            from: workflow_graph_id(&edge.from),
1179            to: workflow_graph_id(&edge.to),
1180            label: edge.label.clone(),
1181            branch: edge.branch.clone(),
1182        });
1183    }
1184
1185    let outgoing: BTreeSet<&str> = canonical
1186        .edges
1187        .iter()
1188        .map(|edge| edge.from.as_str())
1189        .collect();
1190    for node_id in canonical.nodes.keys() {
1191        if !outgoing.contains(node_id.as_str()) {
1192            edges.push(WorkflowBundleGraphEdge {
1193                from: workflow_graph_id(node_id),
1194                to: terminal_completed_graph_id(),
1195                label: Some("completed".to_string()),
1196                branch: Some("completed".to_string()),
1197            });
1198        }
1199        if retry_can_dlq {
1200            edges.push(WorkflowBundleGraphEdge {
1201                from: workflow_graph_id(node_id),
1202                to: dlq_graph_id(),
1203                label: Some("retry exhausted".to_string()),
1204                branch: Some("failed".to_string()),
1205            });
1206        }
1207    }
1208
1209    nodes.push(WorkflowBundleGraphNode {
1210        id: terminal_completed_graph_id(),
1211        node_type: "terminal".to_string(),
1212        label: "Completed".to_string(),
1213        workflow_node_id: None,
1214        trigger_id: None,
1215        connector_id: None,
1216        editable_fields: Vec::new(),
1217        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1218    });
1219    nodes.push(WorkflowBundleGraphNode {
1220        id: terminal_failed_graph_id(),
1221        node_type: "terminal".to_string(),
1222        label: "Failed".to_string(),
1223        workflow_node_id: None,
1224        trigger_id: None,
1225        connector_id: None,
1226        editable_fields: Vec::new(),
1227        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1228    });
1229    if retry_can_dlq {
1230        edges.push(WorkflowBundleGraphEdge {
1231            from: dlq_graph_id(),
1232            to: terminal_failed_graph_id(),
1233            label: Some("failed".to_string()),
1234            branch: Some("failed".to_string()),
1235        });
1236    }
1237
1238    nodes.sort_by(|left, right| left.id.cmp(&right.id));
1239    edges.sort_by(|left, right| {
1240        (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1241            &right.from,
1242            &right.to,
1243            &right.branch,
1244            &right.label,
1245        ))
1246    });
1247    editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1248
1249    let diagnostics = validation
1250        .errors
1251        .iter()
1252        .chain(validation.warnings.iter())
1253        .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1254            severity: diagnostic.severity.clone(),
1255            path: diagnostic.path.clone(),
1256            message: diagnostic.message.clone(),
1257            node_id: diagnostic.node_id.clone(),
1258            graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1259        })
1260        .collect::<Vec<_>>();
1261    let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1262
1263    WorkflowBundleGraphExport {
1264        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1265        graph_id: canonical.id,
1266        graph_digest: validation.graph_digest.clone(),
1267        nodes,
1268        edges,
1269        diagnostics,
1270        editable_fields,
1271        mermaid,
1272    }
1273}
1274
1275pub fn run_workflow_bundle(
1276    bundle: &WorkflowBundle,
1277    request: WorkflowBundleRunRequest,
1278) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1279    let validation = validate_workflow_bundle(bundle);
1280    if !validation.valid {
1281        return Err(validation);
1282    }
1283
1284    let canonical = canonical_workflow_graph(&bundle.workflow);
1285    let trigger_id = match request.trigger_id {
1286        Some(trigger_id)
1287            if !bundle
1288                .triggers
1289                .iter()
1290                .any(|trigger| trigger.id == trigger_id) =>
1291        {
1292            let mut report = validation;
1293            push_error(
1294                &mut report,
1295                "trigger_id",
1296                format!("unknown trigger id: {trigger_id}"),
1297                None,
1298            );
1299            report.valid = false;
1300            return Err(report);
1301        }
1302        Some(trigger_id) => Some(trigger_id),
1303        None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1304    };
1305    let mut event_ids = bundle.receipts.event_ids.clone();
1306    if let Some(event_id) = request.event_id {
1307        if !event_ids.contains(&event_id) {
1308            event_ids.push(event_id);
1309        }
1310    }
1311    let run_id = bundle
1312        .receipts
1313        .run_id
1314        .clone()
1315        .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1316    let capsules_by_node = capsules_by_node(bundle);
1317    let executed_nodes = execution_order(&canonical)
1318        .into_iter()
1319        .map(|node_id| {
1320            let node = canonical
1321                .nodes
1322                .get(&node_id)
1323                .expect("execution order only contains known nodes");
1324            WorkflowBundleRunNodeReceipt {
1325                node_id: node_id.clone(),
1326                kind: node.kind.clone(),
1327                prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1328                status: "completed".to_string(),
1329            }
1330        })
1331        .collect();
1332
1333    Ok(WorkflowBundleRunReceipt {
1334        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1335        receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1336        bundle_id: bundle.id.clone(),
1337        bundle_version: bundle.version.clone(),
1338        workflow_id: canonical.id,
1339        workflow_version: canonical.version,
1340        graph_digest: validation.graph_digest,
1341        run_id,
1342        trigger_id,
1343        event_ids,
1344        status: "completed".to_string(),
1345        executed_nodes,
1346        policy: bundle.policy.clone(),
1347        connectors: bundle.connectors.clone(),
1348        environment: bundle.environment.clone(),
1349    })
1350}
1351
1352fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1353    let mut canonical = graph.clone();
1354    if canonical.type_name.is_empty() {
1355        canonical.type_name = "workflow_graph".to_string();
1356    }
1357    if canonical.version == 0 {
1358        canonical.version = 1;
1359    }
1360    if canonical.entry.is_empty() {
1361        canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1362    }
1363    for (node_id, node) in &mut canonical.nodes {
1364        if node.id.is_none() {
1365            node.id = Some(node_id.clone());
1366        }
1367        if node.kind.is_empty() {
1368            node.kind = "stage".to_string();
1369        }
1370        if node.retry_policy.max_attempts == 0 {
1371            node.retry_policy.max_attempts = 1;
1372        }
1373    }
1374    canonical.edges = sorted_edges(&canonical);
1375    canonical
1376}
1377
1378fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1379    let mut edges = graph.edges.clone();
1380    edges.sort_by(|left, right| {
1381        (
1382            &left.from,
1383            &left.to,
1384            left.branch.as_deref(),
1385            left.label.as_deref(),
1386        )
1387            .cmp(&(
1388                &right.from,
1389                &right.to,
1390                right.branch.as_deref(),
1391                right.label.as_deref(),
1392            ))
1393    });
1394    edges
1395}
1396
1397fn validate_bundle_identity(
1398    bundle: &WorkflowBundle,
1399    graph: &WorkflowGraph,
1400    report: &mut WorkflowBundleValidationReport,
1401) {
1402    if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1403        push_error(
1404            report,
1405            "schema_version",
1406            format!(
1407                "unsupported schema_version {}; expected {}",
1408                bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1409            ),
1410            None,
1411        );
1412    }
1413    if bundle.id.trim().is_empty() {
1414        push_error(report, "id", "bundle id is required", None);
1415    }
1416    if bundle.version.trim().is_empty() {
1417        push_error(report, "version", "bundle version is required", None);
1418    }
1419    if graph.id.trim().is_empty() {
1420        push_error(
1421            report,
1422            "workflow.id",
1423            "workflow id is required for portable bundles",
1424            None,
1425        );
1426    }
1427    if graph.nodes.is_empty() {
1428        push_error(
1429            report,
1430            "workflow.nodes",
1431            "workflow must contain nodes",
1432            None,
1433        );
1434    }
1435    for (node_id, node) in &graph.nodes {
1436        if node_id.trim().is_empty() {
1437            push_error(report, "workflow.nodes", "node id is required", None);
1438        }
1439        if node.id.as_deref().is_some_and(|id| id != node_id) {
1440            push_error(
1441                report,
1442                format!("workflow.nodes.{node_id}.id"),
1443                "node id field must match its map key",
1444                Some(node_id.clone()),
1445            );
1446        }
1447    }
1448}
1449
1450fn validate_manifest_contract(
1451    bundle: &WorkflowBundle,
1452    report: &mut WorkflowBundleValidationReport,
1453) {
1454    validate_relative_path(
1455        report,
1456        "entrypoint",
1457        &bundle.entrypoint,
1458        "entrypoint is required",
1459    );
1460    if bundle.transitive_modules.is_empty() {
1461        push_error(
1462            report,
1463            "transitive_modules",
1464            "at least one transitive module entry is required",
1465            None,
1466        );
1467    }
1468    let mut module_paths = BTreeSet::new();
1469    for (index, module) in bundle.transitive_modules.iter().enumerate() {
1470        let path = format!("transitive_modules[{index}]");
1471        validate_relative_path(
1472            report,
1473            format!("{path}.path"),
1474            &module.path,
1475            "module path is required",
1476        );
1477        if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1478        {
1479            push_error(
1480                report,
1481                format!("{path}.path"),
1482                format!(
1483                    "duplicate transitive module path: {}",
1484                    module.path.display()
1485                ),
1486                None,
1487            );
1488        }
1489        validate_blake3_hash(
1490            report,
1491            format!("{path}.source_hash_blake3"),
1492            &module.source_hash_blake3,
1493            true,
1494        );
1495        validate_blake3_hash(
1496            report,
1497            format!("{path}.harnbc_hash_blake3"),
1498            &module.harnbc_hash_blake3,
1499            true,
1500        );
1501    }
1502    if bundle.stdlib_version.trim().is_empty() {
1503        push_error(report, "stdlib_version", "stdlib_version is required", None);
1504    }
1505    if bundle.harn_version.trim().is_empty() {
1506        push_error(report, "harn_version", "harn_version is required", None);
1507    }
1508    validate_blake3_hash(
1509        report,
1510        "provider_catalog_hash",
1511        &bundle.provider_catalog_hash,
1512        true,
1513    );
1514
1515    let mut tool_names = BTreeSet::new();
1516    for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1517        let path = format!("tool_manifest[{index}]");
1518        if tool.name.trim().is_empty() {
1519            push_error(
1520                report,
1521                format!("{path}.name"),
1522                "tool manifest entry name is required",
1523                None,
1524            );
1525        } else if !tool_names.insert(tool.name.clone()) {
1526            push_error(
1527                report,
1528                format!("{path}.name"),
1529                format!("duplicate tool manifest entry: {}", tool.name),
1530                None,
1531            );
1532        }
1533        if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1534            validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1535        }
1536    }
1537
1538    if bundle.sbom.format.trim().is_empty() {
1539        push_error(report, "sbom.format", "SBOM format is required", None);
1540    }
1541    if bundle.sbom.version.trim().is_empty() {
1542        push_error(report, "sbom.version", "SBOM version is required", None);
1543    }
1544    let mut sbom_packages = BTreeSet::new();
1545    for (index, package) in bundle.sbom.packages.iter().enumerate() {
1546        let path = format!("sbom.packages[{index}]");
1547        if package.name.trim().is_empty() {
1548            push_error(
1549                report,
1550                format!("{path}.name"),
1551                "SBOM package name is required",
1552                None,
1553            );
1554        } else if !sbom_packages.insert(package.name.clone()) {
1555            push_error(
1556                report,
1557                format!("{path}.name"),
1558                format!("duplicate SBOM package: {}", package.name),
1559                None,
1560            );
1561        }
1562        if let Some(hash) = package.package_hash_blake3.as_deref() {
1563            validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1564        }
1565    }
1566    for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1567        let path = format!("sbom.relationships[{index}]");
1568        if relationship.from.trim().is_empty() {
1569            push_error(
1570                report,
1571                format!("{path}.from"),
1572                "SBOM relationship source is required",
1573                None,
1574            );
1575        }
1576        if relationship.to.trim().is_empty() {
1577            push_error(
1578                report,
1579                format!("{path}.to"),
1580                "SBOM relationship target is required",
1581                None,
1582            );
1583        }
1584        if relationship.relationship_type.trim().is_empty() {
1585            push_error(
1586                report,
1587                format!("{path}.relationship_type"),
1588                "SBOM relationship type is required",
1589                None,
1590            );
1591        }
1592    }
1593
1594    if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1595        if parent_id.trim().is_empty() {
1596            push_error(
1597                report,
1598                "parent_trust_record_id",
1599                "parent_trust_record_id cannot be empty when present",
1600                None,
1601            );
1602        }
1603    }
1604    if let Some(signature) = &bundle.signature {
1605        if signature.algorithm.trim().is_empty() {
1606            push_error(
1607                report,
1608                "signature.algorithm",
1609                "signature algorithm is required",
1610                None,
1611            );
1612        } else if signature.algorithm != "ed25519" {
1613            push_error(
1614                report,
1615                "signature.algorithm",
1616                "signature algorithm must be ed25519",
1617                None,
1618            );
1619        }
1620        if signature.public_key.trim().is_empty() {
1621            push_error(
1622                report,
1623                "signature.public_key",
1624                "signature public_key is required",
1625                None,
1626            );
1627        }
1628        if signature.signature.trim().is_empty() {
1629            push_error(
1630                report,
1631                "signature.signature",
1632                "signature value is required",
1633                None,
1634            );
1635        }
1636        validate_blake3_hash(
1637            report,
1638            "signature.manifest_hash_blake3",
1639            &signature.manifest_hash_blake3,
1640            true,
1641        );
1642    }
1643}
1644
1645fn validate_relative_path(
1646    report: &mut WorkflowBundleValidationReport,
1647    path: impl Into<String>,
1648    value: &Path,
1649    empty_message: &str,
1650) {
1651    let path = path.into();
1652    if value.as_os_str().is_empty() {
1653        push_error(report, path, empty_message, None);
1654        return;
1655    }
1656    if normalize_archive_path(value).is_err() {
1657        push_error(
1658            report,
1659            path,
1660            format!("path must be relative and contained: {}", value.display()),
1661            None,
1662        );
1663    }
1664}
1665
1666fn validate_blake3_hash(
1667    report: &mut WorkflowBundleValidationReport,
1668    path: impl Into<String>,
1669    value: &str,
1670    required: bool,
1671) {
1672    let path = path.into();
1673    if value.trim().is_empty() {
1674        if required {
1675            push_error(report, path, "BLAKE3 hash is required", None);
1676        }
1677        return;
1678    }
1679    let Some(hex) = value.strip_prefix("blake3:") else {
1680        push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1681        return;
1682    };
1683    if hex.len() != 64
1684        || !hex
1685            .bytes()
1686            .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1687    {
1688        push_error(
1689            report,
1690            path,
1691            "BLAKE3 hash must contain 64 lowercase hex digits",
1692            None,
1693        );
1694    }
1695}
1696
1697fn validate_triggers(
1698    bundle: &WorkflowBundle,
1699    graph: &WorkflowGraph,
1700    report: &mut WorkflowBundleValidationReport,
1701) {
1702    if bundle.triggers.is_empty() {
1703        push_warning(report, "triggers", "bundle declares no triggers", None);
1704    }
1705    let mut ids = BTreeSet::new();
1706    for (index, trigger) in bundle.triggers.iter().enumerate() {
1707        let path = format!("triggers[{index}]");
1708        if trigger.id.trim().is_empty() {
1709            push_error(report, format!("{path}.id"), "trigger id is required", None);
1710        } else if !ids.insert(trigger.id.clone()) {
1711            push_error(
1712                report,
1713                format!("{path}.id"),
1714                format!("duplicate trigger id: {}", trigger.id),
1715                None,
1716            );
1717        }
1718        match trigger.kind.as_str() {
1719            "github" => {
1720                if trigger.provider.as_deref() != Some("github") {
1721                    push_error(
1722                        report,
1723                        format!("{path}.provider"),
1724                        "github triggers require provider=\"github\"",
1725                        None,
1726                    );
1727                }
1728                if trigger.events.is_empty() {
1729                    push_error(
1730                        report,
1731                        format!("{path}.events"),
1732                        "github triggers require at least one event",
1733                        None,
1734                    );
1735                }
1736            }
1737            "cron" if trigger.schedule.is_none() => push_error(
1738                report,
1739                format!("{path}.schedule"),
1740                "cron triggers require schedule",
1741                None,
1742            ),
1743            "delay" if trigger.delay.is_none() => push_error(
1744                report,
1745                format!("{path}.delay"),
1746                "delay triggers require delay",
1747                None,
1748            ),
1749            "webhook" if trigger.webhook_path.is_none() => push_error(
1750                report,
1751                format!("{path}.webhook_path"),
1752                "webhook triggers require webhook_path",
1753                None,
1754            ),
1755            "mcp" if trigger.mcp_tool.is_none() => push_error(
1756                report,
1757                format!("{path}.mcp_tool"),
1758                "mcp triggers require mcp_tool",
1759                None,
1760            ),
1761            "manual" => {}
1762            "" => push_error(
1763                report,
1764                format!("{path}.kind"),
1765                "trigger kind is required",
1766                None,
1767            ),
1768            other
1769                if !matches!(
1770                    other,
1771                    "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1772                ) =>
1773            {
1774                push_error(
1775                    report,
1776                    format!("{path}.kind"),
1777                    format!("unsupported trigger kind: {other}"),
1778                    None,
1779                );
1780            }
1781            _ => {}
1782        }
1783        if let Some(node_id) = trigger.node_id.as_deref() {
1784            if !graph.nodes.contains_key(node_id) {
1785                push_error(
1786                    report,
1787                    format!("{path}.node_id"),
1788                    format!("trigger references unknown node: {node_id}"),
1789                    Some(node_id.to_string()),
1790                );
1791            }
1792        }
1793    }
1794}
1795
1796fn validate_prompt_capsules(
1797    bundle: &WorkflowBundle,
1798    graph: &WorkflowGraph,
1799    report: &mut WorkflowBundleValidationReport,
1800) {
1801    let trigger_ids: BTreeSet<&str> = bundle
1802        .triggers
1803        .iter()
1804        .map(|trigger| trigger.id.as_str())
1805        .collect();
1806    let mut node_refs = BTreeSet::new();
1807    for (key, capsule) in &bundle.prompt_capsules {
1808        let path = format!("prompt_capsules.{key}");
1809        if capsule.id.trim().is_empty() {
1810            push_error(
1811                report,
1812                format!("{path}.id"),
1813                "prompt capsule id is required",
1814                None,
1815            );
1816        } else if capsule.id != *key {
1817            push_error(
1818                report,
1819                format!("{path}.id"),
1820                "prompt capsule id must match its map key",
1821                None,
1822            );
1823        }
1824        if capsule.prompt.trim().is_empty() {
1825            push_error(
1826                report,
1827                format!("{path}.prompt"),
1828                "prompt capsule prompt is required",
1829                Some(capsule.node_id.clone()),
1830            );
1831        }
1832        if !graph.nodes.contains_key(&capsule.node_id) {
1833            push_error(
1834                report,
1835                format!("{path}.node_id"),
1836                format!(
1837                    "prompt capsule references unknown node: {}",
1838                    capsule.node_id
1839                ),
1840                Some(capsule.node_id.clone()),
1841            );
1842        }
1843        if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1844            push_error(
1845                report,
1846                format!("{path}.node_id"),
1847                format!("multiple prompt capsules target node {}", capsule.node_id),
1848                Some(capsule.node_id.clone()),
1849            );
1850        }
1851        if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1852            if !trigger_ids.contains(trigger_id) {
1853                push_error(
1854                    report,
1855                    format!("{path}.trigger_id"),
1856                    format!("prompt capsule references unknown trigger: {trigger_id}"),
1857                    Some(capsule.node_id.clone()),
1858                );
1859            }
1860        }
1861    }
1862}
1863
1864fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1865    if !matches!(
1866        bundle.policy.autonomy_tier.as_str(),
1867        "shadow" | "suggest" | "act_with_approval" | "act_auto"
1868    ) {
1869        push_error(
1870            report,
1871            "policy.autonomy_tier",
1872            "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1873            None,
1874        );
1875    }
1876    if bundle.policy.retry.max_attempts == 0 {
1877        push_error(
1878            report,
1879            "policy.retry.max_attempts",
1880            "retry.max_attempts must be at least 1",
1881            None,
1882        );
1883    }
1884    if !matches!(
1885        bundle.policy.catchup.mode.as_str(),
1886        "none" | "latest" | "all"
1887    ) {
1888        push_error(
1889            report,
1890            "policy.catchup.mode",
1891            "catchup.mode must be none, latest, or all",
1892            None,
1893        );
1894    }
1895}
1896
1897fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1898    let mut ids = BTreeSet::new();
1899    let provider_ids: BTreeSet<&str> = bundle
1900        .connectors
1901        .iter()
1902        .map(|connector| connector.provider_id.as_str())
1903        .collect();
1904    for (index, connector) in bundle.connectors.iter().enumerate() {
1905        let path = format!("connectors[{index}]");
1906        if connector.id.trim().is_empty() {
1907            push_error(
1908                report,
1909                format!("{path}.id"),
1910                "connector id is required",
1911                None,
1912            );
1913        } else if !ids.insert(connector.id.clone()) {
1914            push_error(
1915                report,
1916                format!("{path}.id"),
1917                format!("duplicate connector id: {}", connector.id),
1918                None,
1919            );
1920        }
1921        if connector.provider_id.trim().is_empty() {
1922            push_error(
1923                report,
1924                format!("{path}.provider_id"),
1925                "connector provider_id is required",
1926                None,
1927            );
1928        }
1929    }
1930    for trigger in &bundle.triggers {
1931        if let Some(provider) = trigger.provider.as_deref() {
1932            if !provider_ids.contains(provider) {
1933                push_warning(
1934                    report,
1935                    "connectors",
1936                    format!(
1937                        "trigger {} references provider {provider} with no connector requirement",
1938                        trigger.id
1939                    ),
1940                    trigger.node_id.clone(),
1941                );
1942            }
1943        }
1944    }
1945}
1946
1947fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1948    if !matches!(
1949        bundle.environment.worktree_policy.as_str(),
1950        "reuse_current" | "new_worktree" | "host_managed"
1951    ) {
1952        push_error(
1953            report,
1954            "environment.worktree_policy",
1955            "worktree_policy must be reuse_current, new_worktree, or host_managed",
1956            None,
1957        );
1958    }
1959}
1960
1961fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1962    for prefix in [
1963        "node is unreachable: ",
1964        "edge.from references unknown node: ",
1965        "edge.to references unknown node: ",
1966        "entry node does not exist: ",
1967    ] {
1968        if let Some(node_id) = message.strip_prefix(prefix) {
1969            return Some(node_id.to_string());
1970        }
1971    }
1972    if let Some(rest) = message.strip_prefix("node ") {
1973        if let Some((node_id, _)) = rest.split_once(':') {
1974            return Some(node_id.to_string());
1975        }
1976    }
1977    graph
1978        .nodes
1979        .keys()
1980        .find(|node_id| message.contains(&format!("node {node_id}:")))
1981        .cloned()
1982}
1983
1984fn workflow_graph_id(node_id: &str) -> String {
1985    format!("node/{node_id}")
1986}
1987
1988fn trigger_graph_id(trigger_id: &str) -> String {
1989    format!("trigger/{trigger_id}")
1990}
1991
1992fn connector_graph_id(connector_id: &str) -> String {
1993    format!("connector/{connector_id}")
1994}
1995
1996fn catchup_graph_id() -> String {
1997    "policy/catchup".to_string()
1998}
1999
2000fn dlq_graph_id() -> String {
2001    "policy/dlq".to_string()
2002}
2003
2004fn terminal_completed_graph_id() -> String {
2005    "terminal/completed".to_string()
2006}
2007
2008fn terminal_failed_graph_id() -> String {
2009    "terminal/failed".to_string()
2010}
2011
2012fn workflow_node_type(kind: &str) -> String {
2013    match kind {
2014        "action" => "action",
2015        "stage" | "agent" => "agent",
2016        "subagent" | "worker" => "subagent",
2017        "wait" | "waitpoint" | "delay" => "wait",
2018        "approval" | "hitl" => "approval",
2019        "connector" | "connector_call" => "connector_call",
2020        "notification" | "notify" => "notification",
2021        "terminal" | "success" | "failure" => "terminal",
2022        other if other.trim().is_empty() => "agent",
2023        other => other,
2024    }
2025    .to_string()
2026}
2027
2028fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2029    node.task_label
2030        .clone()
2031        .or_else(|| node.prompt.clone())
2032        .map(|label| label.trim().to_string())
2033        .filter(|label| !label.is_empty())
2034        .unwrap_or_else(|| node_id.to_string())
2035}
2036
2037fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2038    if !trigger.events.is_empty() {
2039        format!("{}: {}", trigger.kind, trigger.events.join(", "))
2040    } else if let Some(schedule) = trigger.schedule.as_deref() {
2041        format!("cron: {schedule}")
2042    } else if let Some(delay) = trigger.delay.as_deref() {
2043        format!("delay: {delay}")
2044    } else {
2045        trigger.id.clone()
2046    }
2047}
2048
2049fn connector_label(connector: &ConnectorRequirement) -> String {
2050    if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2051        connector.id.clone()
2052    } else {
2053        format!("{} ({})", connector.id, connector.provider_id)
2054    }
2055}
2056
2057fn editable_field(
2058    id: impl Into<String>,
2059    label: impl Into<String>,
2060    json_pointer: impl Into<String>,
2061    value_type: impl Into<String>,
2062    required: bool,
2063    enum_values: &[&str],
2064) -> WorkflowBundleEditableField {
2065    WorkflowBundleEditableField {
2066        id: id.into(),
2067        label: label.into(),
2068        json_pointer: json_pointer.into(),
2069        value_type: value_type.into(),
2070        required,
2071        enum_values: enum_values
2072            .iter()
2073            .map(|value| (*value).to_string())
2074            .collect(),
2075    }
2076}
2077
2078fn json_pointer_segment(value: &str) -> String {
2079    value.replace('~', "~0").replace('/', "~1")
2080}
2081
2082fn trigger_editable_fields(
2083    index: usize,
2084    trigger: &WorkflowBundleTrigger,
2085) -> Vec<WorkflowBundleEditableField> {
2086    let base = format!("/triggers/{index}");
2087    let mut fields = vec![
2088        editable_field(
2089            format!("trigger.{}.kind", trigger.id),
2090            "Trigger kind",
2091            format!("{base}/kind"),
2092            "enum",
2093            true,
2094            &["github", "cron", "delay", "manual", "webhook", "mcp"],
2095        ),
2096        editable_field(
2097            format!("trigger.{}.node_id", trigger.id),
2098            "Target node",
2099            format!("{base}/node_id"),
2100            "string",
2101            false,
2102            &[],
2103        ),
2104    ];
2105    if trigger.provider.is_some() || trigger.kind == "github" {
2106        fields.push(editable_field(
2107            format!("trigger.{}.provider", trigger.id),
2108            "Provider",
2109            format!("{base}/provider"),
2110            "string",
2111            trigger.kind == "github",
2112            &[],
2113        ));
2114    }
2115    for (field, label, value_type) in [
2116        ("events", "Events", "list"),
2117        ("schedule", "Schedule", "string"),
2118        ("delay", "Delay", "string"),
2119        ("webhook_path", "Webhook path", "string"),
2120        ("mcp_tool", "MCP tool", "string"),
2121        ("resume_key", "Resume key", "string"),
2122        ("metadata", "Metadata", "object"),
2123    ] {
2124        fields.push(editable_field(
2125            format!("trigger.{}.{}", trigger.id, field),
2126            label,
2127            format!("{base}/{field}"),
2128            value_type,
2129            false,
2130            &[],
2131        ));
2132    }
2133    fields
2134}
2135
2136fn workflow_node_editable_fields(
2137    node_id: &str,
2138    capsule_id: Option<&String>,
2139) -> Vec<WorkflowBundleEditableField> {
2140    let escaped_node = json_pointer_segment(node_id);
2141    let mut fields = vec![
2142        editable_field(
2143            format!("workflow.{node_id}.task_label"),
2144            "Task label",
2145            format!("/workflow/nodes/{escaped_node}/task_label"),
2146            "string",
2147            false,
2148            &[],
2149        ),
2150        editable_field(
2151            format!("workflow.{node_id}.prompt"),
2152            "Prompt",
2153            format!("/workflow/nodes/{escaped_node}/prompt"),
2154            "string",
2155            false,
2156            &[],
2157        ),
2158        editable_field(
2159            format!("workflow.{node_id}.system"),
2160            "System prompt",
2161            format!("/workflow/nodes/{escaped_node}/system"),
2162            "string",
2163            false,
2164            &[],
2165        ),
2166        editable_field(
2167            format!("workflow.{node_id}.model_policy"),
2168            "Model policy",
2169            format!("/workflow/nodes/{escaped_node}/model_policy"),
2170            "object",
2171            false,
2172            &[],
2173        ),
2174        editable_field(
2175            format!("workflow.{node_id}.tools"),
2176            "Tool policy",
2177            format!("/workflow/nodes/{escaped_node}/tools"),
2178            "any",
2179            false,
2180            &[],
2181        ),
2182        editable_field(
2183            format!("workflow.{node_id}.capability_policy"),
2184            "Capability policy",
2185            format!("/workflow/nodes/{escaped_node}/capability_policy"),
2186            "object",
2187            false,
2188            &[],
2189        ),
2190        editable_field(
2191            format!("workflow.{node_id}.approval_policy"),
2192            "Approval policy",
2193            format!("/workflow/nodes/{escaped_node}/approval_policy"),
2194            "object",
2195            false,
2196            &[],
2197        ),
2198        editable_field(
2199            format!("workflow.{node_id}.retry_policy"),
2200            "Retry policy",
2201            format!("/workflow/nodes/{escaped_node}/retry_policy"),
2202            "object",
2203            false,
2204            &[],
2205        ),
2206    ];
2207    if let Some(capsule_id) = capsule_id {
2208        let escaped_capsule = json_pointer_segment(capsule_id);
2209        fields.extend([
2210            editable_field(
2211                format!("prompt_capsule.{capsule_id}.prompt"),
2212                "Prompt capsule",
2213                format!("/prompt_capsules/{escaped_capsule}/prompt"),
2214                "string",
2215                true,
2216                &[],
2217            ),
2218            editable_field(
2219                format!("prompt_capsule.{capsule_id}.system"),
2220                "Prompt capsule system",
2221                format!("/prompt_capsules/{escaped_capsule}/system"),
2222                "string",
2223                false,
2224                &[],
2225            ),
2226            editable_field(
2227                format!("prompt_capsule.{capsule_id}.context"),
2228                "Prompt capsule context",
2229                format!("/prompt_capsules/{escaped_capsule}/context"),
2230                "object",
2231                false,
2232                &[],
2233            ),
2234            editable_field(
2235                format!("prompt_capsule.{capsule_id}.trigger_id"),
2236                "Prompt capsule trigger",
2237                format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2238                "string",
2239                false,
2240                &[],
2241            ),
2242        ]);
2243    }
2244    fields
2245}
2246
2247fn connector_editable_fields(
2248    index: usize,
2249    connector: &ConnectorRequirement,
2250) -> Vec<WorkflowBundleEditableField> {
2251    let base = format!("/connectors/{index}");
2252    [
2253        ("id", "Connector id", "string", true),
2254        ("provider_id", "Provider id", "string", true),
2255        ("scopes", "Scopes", "list", false),
2256        ("setup_required", "Setup required", "bool", false),
2257        ("status_required", "Status required", "bool", false),
2258    ]
2259    .into_iter()
2260    .map(|(field, label, value_type, required)| {
2261        editable_field(
2262            format!("connector.{}.{}", connector.id, field),
2263            label,
2264            format!("{base}/{field}"),
2265            value_type,
2266            required,
2267            &[],
2268        )
2269    })
2270    .collect()
2271}
2272
2273fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2274    vec![
2275        editable_field(
2276            "policy.retry.max_attempts",
2277            "Retry attempts",
2278            "/policy/retry/max_attempts",
2279            "integer",
2280            true,
2281            &[],
2282        ),
2283        editable_field(
2284            "policy.retry.backoff",
2285            "Retry backoff",
2286            "/policy/retry/backoff",
2287            "string",
2288            true,
2289            &[],
2290        ),
2291    ]
2292}
2293
2294fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2295    vec![
2296        editable_field(
2297            "policy.catchup.mode",
2298            "Catchup mode",
2299            "/policy/catchup/mode",
2300            "enum",
2301            true,
2302            &["none", "latest", "all"],
2303        ),
2304        editable_field(
2305            "policy.catchup.max_events",
2306            "Catchup max events",
2307            "/policy/catchup/max_events",
2308            "integer",
2309            false,
2310            &[],
2311        ),
2312    ]
2313}
2314
2315fn render_workflow_bundle_mermaid(
2316    nodes: &[WorkflowBundleGraphNode],
2317    edges: &[WorkflowBundleGraphEdge],
2318) -> String {
2319    let mut lines = vec!["flowchart TD".to_string()];
2320    for node in nodes {
2321        lines.push(format!(
2322            "  {}[\"{}\"]",
2323            mermaid_id(&node.id),
2324            mermaid_label(&format!("{}: {}", node.node_type, node.label))
2325        ));
2326    }
2327    for edge in edges {
2328        let label = edge
2329            .label
2330            .as_deref()
2331            .or(edge.branch.as_deref())
2332            .map(mermaid_label);
2333        match label {
2334            Some(label) if !label.is_empty() => lines.push(format!(
2335                "  {} -->|{}| {}",
2336                mermaid_id(&edge.from),
2337                label,
2338                mermaid_id(&edge.to)
2339            )),
2340            _ => lines.push(format!(
2341                "  {} --> {}",
2342                mermaid_id(&edge.from),
2343                mermaid_id(&edge.to)
2344            )),
2345        }
2346    }
2347    lines.join("\n")
2348}
2349
2350fn mermaid_id(value: &str) -> String {
2351    let digest = Sha256::digest(value.as_bytes());
2352    let suffix = digest
2353        .iter()
2354        .take(4)
2355        .map(|byte| format!("{byte:02x}"))
2356        .collect::<String>();
2357    let mut out = format!("n_{suffix}_");
2358    for ch in value.chars() {
2359        if ch.is_ascii_alphanumeric() {
2360            out.push(ch);
2361        } else {
2362            out.push('_');
2363        }
2364    }
2365    out
2366}
2367
2368fn mermaid_label(value: &str) -> String {
2369    value
2370        .replace('\\', "\\\\")
2371        .replace('"', "\\\"")
2372        .replace('\n', " ")
2373}
2374
2375fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2376    let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2377    for trigger in &bundle.triggers {
2378        if let Some(node_id) = trigger.node_id.as_ref() {
2379            by_node
2380                .entry(node_id.clone())
2381                .or_default()
2382                .push(trigger.id.clone());
2383        }
2384    }
2385    by_node
2386}
2387
2388fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2389    bundle
2390        .prompt_capsules
2391        .iter()
2392        .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2393        .collect()
2394}
2395
2396fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2397    let outgoing =
2398        graph
2399            .edges
2400            .iter()
2401            .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2402                acc.entry(edge.from.clone())
2403                    .or_default()
2404                    .push(edge.to.clone());
2405                acc
2406            });
2407    let mut seen = BTreeSet::new();
2408    let mut queue = VecDeque::from([graph.entry.clone()]);
2409    let mut order = Vec::new();
2410    while let Some(node_id) = queue.pop_front() {
2411        if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2412            continue;
2413        }
2414        order.push(node_id.clone());
2415        if let Some(next) = outgoing.get(&node_id) {
2416            let mut next = next.clone();
2417            next.sort();
2418            for child in next {
2419                queue.push_back(child);
2420            }
2421        }
2422    }
2423    order
2424}
2425
2426fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2427    let suffix = graph_digest
2428        .strip_prefix("sha256:")
2429        .unwrap_or(graph_digest)
2430        .chars()
2431        .take(12)
2432        .collect::<String>();
2433    format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2434}
2435
2436fn sanitize_id(value: &str) -> String {
2437    value
2438        .chars()
2439        .map(|ch| {
2440            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2441                ch
2442            } else {
2443                '_'
2444            }
2445        })
2446        .collect()
2447}
2448
2449fn push_error(
2450    report: &mut WorkflowBundleValidationReport,
2451    path: impl Into<String>,
2452    message: impl Into<String>,
2453    node_id: Option<String>,
2454) {
2455    report.errors.push(WorkflowBundleDiagnostic {
2456        severity: "error".to_string(),
2457        path: path.into(),
2458        message: message.into(),
2459        node_id,
2460    });
2461}
2462
2463fn push_warning(
2464    report: &mut WorkflowBundleValidationReport,
2465    path: impl Into<String>,
2466    message: impl Into<String>,
2467    node_id: Option<String>,
2468) {
2469    report.warnings.push(WorkflowBundleDiagnostic {
2470        severity: "warning".to_string(),
2471        path: path.into(),
2472        message: message.into(),
2473        node_id,
2474    });
2475}
2476
2477#[cfg(test)]
2478#[path = "workflow_bundle_tests.rs"]
2479mod workflow_bundle_tests;