Skip to main content

harn_vm/orchestration/
workflow_bundle.rs

1//! Portable workflow bundle contract, `.harnpack` container helpers, and
2//! deterministic local receipts.
3
4use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
23#[serde(default)]
24pub struct WorkflowBundle {
25    pub schema_version: u32,
26    pub entrypoint: PathBuf,
27    pub transitive_modules: Vec<ModuleEntry>,
28    pub stdlib_version: String,
29    pub harn_version: String,
30    pub provider_catalog_hash: String,
31    pub tool_manifest: Vec<ToolEntry>,
32    pub sbom: SBOMDoc,
33    pub signature: Option<Ed25519Signature>,
34    pub parent_trust_record_id: Option<String>,
35    pub id: String,
36    pub name: Option<String>,
37    pub version: String,
38    pub triggers: Vec<WorkflowBundleTrigger>,
39    pub workflow: WorkflowGraph,
40    pub prompt_capsules: BTreeMap<String, PromptCapsule>,
41    pub policy: WorkflowBundlePolicy,
42    pub connectors: Vec<ConnectorRequirement>,
43    pub environment: EnvironmentRequirements,
44    pub receipts: WorkflowBundleReplayMetadata,
45    pub metadata: BTreeMap<String, serde_json::Value>,
46}
47
48impl Default for WorkflowBundle {
49    fn default() -> Self {
50        Self {
51            schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
52            entrypoint: PathBuf::new(),
53            transitive_modules: Vec::new(),
54            stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
55            harn_version: env!("CARGO_PKG_VERSION").to_string(),
56            provider_catalog_hash: String::new(),
57            tool_manifest: Vec::new(),
58            sbom: SBOMDoc::default(),
59            signature: None,
60            parent_trust_record_id: None,
61            id: String::new(),
62            name: None,
63            version: String::new(),
64            triggers: Vec::new(),
65            workflow: WorkflowGraph::default(),
66            prompt_capsules: BTreeMap::new(),
67            policy: WorkflowBundlePolicy::default(),
68            connectors: Vec::new(),
69            environment: EnvironmentRequirements::default(),
70            receipts: WorkflowBundleReplayMetadata::default(),
71            metadata: BTreeMap::new(),
72        }
73    }
74}
75
76#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
77#[serde(default)]
78pub struct ModuleEntry {
79    pub path: PathBuf,
80    pub source_hash_blake3: String,
81    pub harnbc_hash_blake3: String,
82}
83
84#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct ToolEntry {
87    pub name: String,
88    pub provider: Option<String>,
89    pub annotations: Option<ToolAnnotations>,
90    pub schema_hash_blake3: Option<String>,
91    pub metadata: BTreeMap<String, String>,
92}
93
94#[allow(clippy::upper_case_acronyms)]
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct SBOMDoc {
98    pub format: String,
99    pub version: String,
100    pub packages: Vec<SBOMPackage>,
101    pub relationships: Vec<SBOMRelationship>,
102}
103
104#[allow(clippy::upper_case_acronyms)]
105#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
106#[serde(default)]
107pub struct SBOMPackage {
108    pub name: String,
109    pub version: Option<String>,
110    pub package_hash_blake3: Option<String>,
111    pub license: Option<String>,
112}
113
114#[allow(clippy::upper_case_acronyms)]
115#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
116#[serde(default)]
117pub struct SBOMRelationship {
118    pub from: String,
119    pub to: String,
120    pub relationship_type: String,
121}
122
123#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct Ed25519Signature {
126    pub key_id: Option<String>,
127    pub public_key: String,
128    pub signature: String,
129    pub manifest_hash_blake3: String,
130    pub algorithm: String,
131}
132
133#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
134#[serde(default)]
135pub struct WorkflowBundleTrigger {
136    pub id: String,
137    pub kind: String,
138    pub provider: Option<String>,
139    pub events: Vec<String>,
140    pub schedule: Option<String>,
141    pub delay: Option<String>,
142    pub webhook_path: Option<String>,
143    pub mcp_tool: Option<String>,
144    pub resume_key: Option<String>,
145    pub node_id: Option<String>,
146    pub metadata: BTreeMap<String, String>,
147}
148
149#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
150#[serde(default)]
151pub struct PromptCapsule {
152    pub id: String,
153    pub node_id: String,
154    pub trigger_id: Option<String>,
155    pub prompt: String,
156    pub system: Option<String>,
157    pub context: BTreeMap<String, serde_json::Value>,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
161#[serde(default)]
162pub struct WorkflowBundlePolicy {
163    pub autonomy_tier: String,
164    pub tool_policy: BTreeMap<String, serde_json::Value>,
165    pub approval_required: Vec<String>,
166    pub retry: RetryPolicySpec,
167    pub catchup: CatchupPolicySpec,
168}
169
170impl Default for WorkflowBundlePolicy {
171    fn default() -> Self {
172        Self {
173            autonomy_tier: "act_with_approval".to_string(),
174            tool_policy: BTreeMap::new(),
175            approval_required: Vec::new(),
176            retry: RetryPolicySpec::default(),
177            catchup: CatchupPolicySpec::default(),
178        }
179    }
180}
181
182#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
183#[serde(default)]
184pub struct RetryPolicySpec {
185    pub max_attempts: u32,
186    pub backoff: String,
187}
188
189impl Default for RetryPolicySpec {
190    fn default() -> Self {
191        Self {
192            max_attempts: 1,
193            backoff: "none".to_string(),
194        }
195    }
196}
197
198#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
199#[serde(default)]
200pub struct CatchupPolicySpec {
201    pub mode: String,
202    pub max_events: Option<u32>,
203}
204
205impl Default for CatchupPolicySpec {
206    fn default() -> Self {
207        Self {
208            mode: "latest".to_string(),
209            max_events: Some(1),
210        }
211    }
212}
213
214#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
215#[serde(default)]
216pub struct ConnectorRequirement {
217    pub id: String,
218    pub provider_id: String,
219    pub scopes: Vec<String>,
220    pub setup_required: bool,
221    pub status_required: bool,
222}
223
224#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct EnvironmentRequirements {
227    pub repo_setup_profile: Option<String>,
228    pub worktree_policy: String,
229    pub command_gates: Vec<String>,
230}
231
232impl Default for EnvironmentRequirements {
233    fn default() -> Self {
234        Self {
235            repo_setup_profile: None,
236            worktree_policy: "host_managed".to_string(),
237            command_gates: Vec::new(),
238        }
239    }
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
243#[serde(default)]
244pub struct WorkflowBundleReplayMetadata {
245    pub run_id: Option<String>,
246    pub event_ids: Vec<String>,
247    pub workflow_version: Option<usize>,
248    pub graph_digest: Option<String>,
249}
250
251#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct WorkflowBundleDiagnostic {
254    pub severity: String,
255    pub path: String,
256    pub message: String,
257    pub node_id: Option<String>,
258}
259
260#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(default)]
262pub struct WorkflowBundleValidationReport {
263    pub valid: bool,
264    pub bundle_id: String,
265    pub workflow_id: String,
266    pub graph_digest: String,
267    pub errors: Vec<WorkflowBundleDiagnostic>,
268    pub warnings: Vec<WorkflowBundleDiagnostic>,
269}
270
271#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
272pub struct WorkflowBundlePreview {
273    pub schema_version: u32,
274    pub bundle_id: String,
275    pub bundle_version: String,
276    pub workflow_id: String,
277    pub workflow_version: usize,
278    pub graph_digest: String,
279    pub validation: WorkflowBundleValidationReport,
280    pub graph: WorkflowBundleGraphExport,
281    pub mermaid: String,
282    pub triggers: Vec<WorkflowBundleTrigger>,
283    pub connectors: Vec<ConnectorRequirement>,
284    pub environment: EnvironmentRequirements,
285    pub nodes: Vec<WorkflowBundlePreviewNode>,
286    pub edges: Vec<WorkflowEdge>,
287}
288
289#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
290pub struct WorkflowBundlePreviewNode {
291    pub id: String,
292    pub kind: String,
293    pub label: Option<String>,
294    pub prompt_capsule: Option<String>,
295    pub trigger_ids: Vec<String>,
296    pub outgoing: Vec<String>,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
300pub struct WorkflowBundleGraphExport {
301    pub schema_version: u32,
302    pub graph_id: String,
303    pub graph_digest: String,
304    pub nodes: Vec<WorkflowBundleGraphNode>,
305    pub edges: Vec<WorkflowBundleGraphEdge>,
306    pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
307    pub editable_fields: Vec<WorkflowBundleEditableField>,
308    pub mermaid: String,
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
312pub struct WorkflowBundleGraphNode {
313    pub id: String,
314    pub node_type: String,
315    pub label: String,
316    pub workflow_node_id: Option<String>,
317    pub trigger_id: Option<String>,
318    pub connector_id: Option<String>,
319    pub editable_fields: Vec<WorkflowBundleEditableField>,
320    pub metadata: BTreeMap<String, serde_json::Value>,
321}
322
323#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
324pub struct WorkflowBundleGraphEdge {
325    pub from: String,
326    pub to: String,
327    pub label: Option<String>,
328    pub branch: Option<String>,
329}
330
331#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
332pub struct WorkflowBundleGraphDiagnostic {
333    pub severity: String,
334    pub path: String,
335    pub message: String,
336    pub node_id: Option<String>,
337    pub graph_node_id: Option<String>,
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
341pub struct WorkflowBundleEditableField {
342    pub id: String,
343    pub label: String,
344    pub json_pointer: String,
345    pub value_type: String,
346    pub required: bool,
347    pub enum_values: Vec<String>,
348}
349
350#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
351#[serde(default)]
352pub struct WorkflowBundleRunRequest {
353    pub trigger_id: Option<String>,
354    pub event_id: Option<String>,
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358pub struct WorkflowBundleRunReceipt {
359    pub schema_version: u32,
360    pub receipt_type: String,
361    pub bundle_id: String,
362    pub bundle_version: String,
363    pub workflow_id: String,
364    pub workflow_version: usize,
365    pub graph_digest: String,
366    pub run_id: String,
367    pub trigger_id: Option<String>,
368    pub event_ids: Vec<String>,
369    pub status: String,
370    pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
371    pub policy: WorkflowBundlePolicy,
372    pub connectors: Vec<ConnectorRequirement>,
373    pub environment: EnvironmentRequirements,
374}
375
376#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
377pub struct WorkflowBundleRunNodeReceipt {
378    pub node_id: String,
379    pub kind: String,
380    pub prompt_capsule: Option<String>,
381    pub status: String,
382}
383
384#[derive(Clone, Debug, PartialEq, Eq)]
385pub struct HarnpackEntry {
386    pub path: PathBuf,
387    pub bytes: Vec<u8>,
388    pub mode: u32,
389}
390
391impl HarnpackEntry {
392    pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
393        Self {
394            path: path.into(),
395            bytes: bytes.into(),
396            mode: DEFAULT_HARNPACK_FILE_MODE,
397        }
398    }
399
400    pub fn with_mode(mut self, mode: u32) -> Self {
401        self.mode = mode;
402        self
403    }
404}
405
406#[derive(Clone, Debug, PartialEq)]
407pub struct HarnpackArchive {
408    pub manifest: WorkflowBundle,
409    pub contents: Vec<HarnpackEntry>,
410}
411
412#[derive(Clone, Debug, PartialEq, Eq)]
413pub enum WorkflowBundleErrorKind {
414    Io,
415    Json,
416    MissingSchemaVersion,
417    UnsupportedSchemaVersion { actual: u32, expected: u32 },
418    InvalidArchive,
419    DuplicateArchiveEntry,
420    UnsafeArchivePath,
421    InvalidSignature,
422}
423
424#[derive(Clone, Debug, PartialEq, Eq)]
425pub struct WorkflowBundleError {
426    pub kind: WorkflowBundleErrorKind,
427    pub message: String,
428}
429
430impl WorkflowBundleError {
431    fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
432        Self {
433            kind,
434            message: message.into(),
435        }
436    }
437
438    fn unsupported_schema_version(actual: u32) -> Self {
439        Self::new(
440            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
441                actual,
442                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
443            },
444            format!(
445                "unsupported workflow bundle schema_version {actual}; expected {}",
446                WORKFLOW_BUNDLE_SCHEMA_VERSION
447            ),
448        )
449    }
450}
451
452impl std::fmt::Display for WorkflowBundleError {
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        self.message.fmt(f)
455    }
456}
457
458impl std::error::Error for WorkflowBundleError {}
459
460impl From<std::io::Error> for WorkflowBundleError {
461    fn from(error: std::io::Error) -> Self {
462        Self::new(WorkflowBundleErrorKind::Io, error.to_string())
463    }
464}
465
466impl From<serde_json::Error> for WorkflowBundleError {
467    fn from(error: serde_json::Error) -> Self {
468        Self::new(WorkflowBundleErrorKind::Json, error.to_string())
469    }
470}
471
472pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
473    let bytes = fs::read(path)?;
474    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
475        read_harnpack(&bytes).map(|archive| archive.manifest)
476    } else {
477        parse_workflow_bundle_manifest(&bytes)
478    }
479}
480
481/// Read a manifest from any prior or current schema version without
482/// rejecting on a `schema_version` mismatch. Used by `harn pack
483/// --upgrade` to read v1 bundles before re-emitting them under the v2
484/// shape. Fields the older schema didn't carry deserialize to their
485/// type defaults via `#[serde(default)]`.
486pub fn read_workflow_bundle_manifest_any_version(
487    bytes: &[u8],
488) -> Result<WorkflowBundle, WorkflowBundleError> {
489    let value: serde_json::Value = serde_json::from_slice(bytes)?;
490    if value.get("schema_version").is_none() {
491        return Err(WorkflowBundleError::new(
492            WorkflowBundleErrorKind::MissingSchemaVersion,
493            "workflow bundle manifest is missing schema_version",
494        ));
495    }
496    serde_json::from_value(value).map_err(Into::into)
497}
498
499/// Variant of [`load_workflow_bundle`] that accepts any historical
500/// schema version. Reads the manifest from a `.harnpack` archive or a
501/// bare JSON manifest as appropriate.
502pub fn load_workflow_bundle_any_version(
503    path: &Path,
504) -> Result<WorkflowBundle, WorkflowBundleError> {
505    let bytes = fs::read(path)?;
506    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
507        let tar_bytes = zstd::stream::decode_all(Cursor::new(bytes))?;
508        let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
509        for entry in archive.entries()? {
510            let mut entry = entry?;
511            if entry.header().entry_type().is_dir() {
512                continue;
513            }
514            let path = normalize_archive_path(entry.path()?.as_ref())?;
515            if path == HARNPACK_MANIFEST_PATH {
516                let mut entry_bytes = Vec::new();
517                entry.read_to_end(&mut entry_bytes)?;
518                return read_workflow_bundle_manifest_any_version(&entry_bytes);
519            }
520        }
521        Err(WorkflowBundleError::new(
522            WorkflowBundleErrorKind::InvalidArchive,
523            format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
524        ))
525    } else {
526        read_workflow_bundle_manifest_any_version(&bytes)
527    }
528}
529
530pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
531    let value: serde_json::Value = serde_json::from_slice(bytes)?;
532    let schema_version = value
533        .get("schema_version")
534        .and_then(serde_json::Value::as_u64)
535        .ok_or_else(|| {
536            WorkflowBundleError::new(
537                WorkflowBundleErrorKind::MissingSchemaVersion,
538                "workflow bundle manifest is missing numeric schema_version",
539            )
540        })?;
541    let actual = u32::try_from(schema_version).map_err(|_| {
542        WorkflowBundleError::new(
543            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
544                actual: u32::MAX,
545                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
546            },
547            format!(
548                "unsupported workflow bundle schema_version {schema_version}; expected {}",
549                WORKFLOW_BUNDLE_SCHEMA_VERSION
550            ),
551        )
552    })?;
553    if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
554        return Err(WorkflowBundleError::unsupported_schema_version(actual));
555    }
556    serde_json::from_value(value).map_err(Into::into)
557}
558
559pub fn canonical_workflow_bundle_manifest_bytes(
560    bundle: &WorkflowBundle,
561) -> Result<Vec<u8>, WorkflowBundleError> {
562    serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
563}
564
565pub fn workflow_bundle_hash(
566    bundle: &WorkflowBundle,
567    contents: &[HarnpackEntry],
568) -> Result<String, WorkflowBundleError> {
569    let mut hasher = blake3::Hasher::new();
570    let mut canonical = canonical_workflow_bundle_manifest(bundle);
571    canonical.signature = None;
572    let manifest_bytes = serde_json::to_vec(&canonical)?;
573    hasher.update(&manifest_bytes);
574
575    let mut content_hashes = contents
576        .iter()
577        .map(|entry| blake3_hash_bytes(&entry.bytes))
578        .collect::<Vec<_>>();
579    content_hashes.sort();
580    for content_hash in content_hashes {
581        hasher.update(b"\n");
582        hasher.update(content_hash.as_bytes());
583    }
584
585    Ok(blake3_digest_string(hasher.finalize()))
586}
587
588pub fn verify_workflow_bundle_signature(
589    bundle: &WorkflowBundle,
590    contents: &[HarnpackEntry],
591) -> Result<(), WorkflowBundleError> {
592    let signature = bundle.signature.as_ref().ok_or_else(|| {
593        WorkflowBundleError::new(
594            WorkflowBundleErrorKind::InvalidSignature,
595            "workflow bundle is unsigned",
596        )
597    })?;
598    if signature.algorithm != "ed25519" {
599        return Err(WorkflowBundleError::new(
600            WorkflowBundleErrorKind::InvalidSignature,
601            format!(
602                "unsupported workflow bundle signature algorithm {}",
603                signature.algorithm
604            ),
605        ));
606    }
607    let expected_hash = workflow_bundle_hash(bundle, contents)?;
608    if signature.manifest_hash_blake3 != expected_hash {
609        return Err(WorkflowBundleError::new(
610            WorkflowBundleErrorKind::InvalidSignature,
611            format!(
612                "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
613                signature.manifest_hash_blake3
614            ),
615        ));
616    }
617    let public_key_bytes = decode_hex_exact::<32>(
618        "workflow bundle signature public_key",
619        &signature.public_key,
620    )?;
621    let signature_bytes =
622        decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
623    let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
624        WorkflowBundleError::new(
625            WorkflowBundleErrorKind::InvalidSignature,
626            format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
627        )
628    })?;
629    let ed25519_signature = Signature::from_bytes(&signature_bytes);
630    verifying_key
631        .verify(expected_hash.as_bytes(), &ed25519_signature)
632        .map_err(|error| {
633            WorkflowBundleError::new(
634                WorkflowBundleErrorKind::InvalidSignature,
635                format!("workflow bundle signature failed Ed25519 verification: {error}"),
636            )
637        })
638}
639
640pub fn build_harnpack(
641    bundle: &WorkflowBundle,
642    contents: &[HarnpackEntry],
643) -> Result<Vec<u8>, WorkflowBundleError> {
644    let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
645    let mut entries = contents
646        .iter()
647        .map(|entry| {
648            normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
649        })
650        .collect::<Result<Vec<_>, _>>()?;
651    entries.sort_by(|left, right| left.0.cmp(&right.0));
652
653    let mut seen = BTreeSet::new();
654    for (path, _, _) in &entries {
655        if path == HARNPACK_MANIFEST_PATH {
656            return Err(WorkflowBundleError::new(
657                WorkflowBundleErrorKind::DuplicateArchiveEntry,
658                format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
659            ));
660        }
661        if !seen.insert(path.clone()) {
662            return Err(WorkflowBundleError::new(
663                WorkflowBundleErrorKind::DuplicateArchiveEntry,
664                format!("duplicate archive entry: {path}"),
665            ));
666        }
667    }
668
669    let mut tar_bytes = Vec::new();
670    {
671        let mut builder = tar::Builder::new(&mut tar_bytes);
672        append_harnpack_entry(
673            &mut builder,
674            HARNPACK_MANIFEST_PATH,
675            &manifest_bytes,
676            DEFAULT_HARNPACK_FILE_MODE,
677        )?;
678        for (path, bytes, mode) in entries {
679            append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
680        }
681        builder.finish()?;
682    }
683
684    zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
685}
686
687pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
688    let tar_bytes = zstd::stream::decode_all(Cursor::new(bytes))?;
689    let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
690    let mut manifest = None;
691    let mut contents = Vec::new();
692    let mut seen = BTreeSet::new();
693
694    for entry in archive.entries()? {
695        let mut entry = entry?;
696        if entry.header().entry_type().is_dir() {
697            continue;
698        }
699        let path = normalize_archive_path(entry.path()?.as_ref())?;
700        if !seen.insert(path.clone()) {
701            return Err(WorkflowBundleError::new(
702                WorkflowBundleErrorKind::DuplicateArchiveEntry,
703                format!("duplicate archive entry: {path}"),
704            ));
705        }
706        let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
707        let mut entry_bytes = Vec::new();
708        entry.read_to_end(&mut entry_bytes)?;
709
710        if path == HARNPACK_MANIFEST_PATH {
711            manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
712        } else {
713            contents.push(HarnpackEntry {
714                path: PathBuf::from(path),
715                bytes: entry_bytes,
716                mode,
717            });
718        }
719    }
720
721    contents.sort_by(|left, right| left.path.cmp(&right.path));
722    Ok(HarnpackArchive {
723        manifest: manifest.ok_or_else(|| {
724            WorkflowBundleError::new(
725                WorkflowBundleErrorKind::InvalidArchive,
726                format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
727            )
728        })?,
729        contents,
730    })
731}
732
733pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
734    let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
735    Ok(blake3_hash_bytes(&bytes))
736}
737
738pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
739    let mut canonical = canonical_workflow_graph(graph);
740    canonical.audit_log.clear();
741    let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
742    let digest = Sha256::digest(bytes);
743    let hex = digest
744        .iter()
745        .map(|byte| format!("{byte:02x}"))
746        .collect::<String>();
747    format!("sha256:{hex}")
748}
749
750fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
751    let mut canonical = bundle.clone();
752    canonical.workflow = canonical_workflow_graph(&bundle.workflow);
753    canonical.transitive_modules.sort_by(|left, right| {
754        (
755            path_sort_key(&left.path),
756            &left.source_hash_blake3,
757            &left.harnbc_hash_blake3,
758        )
759            .cmp(&(
760                path_sort_key(&right.path),
761                &right.source_hash_blake3,
762                &right.harnbc_hash_blake3,
763            ))
764    });
765    canonical.tool_manifest.sort_by(|left, right| {
766        (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
767            &right.name,
768            &right.provider,
769            &right.schema_hash_blake3,
770        ))
771    });
772    canonical.sbom.packages.sort_by(|left, right| {
773        (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
774            &right.name,
775            &right.version,
776            &right.package_hash_blake3,
777        ))
778    });
779    canonical.sbom.relationships.sort_by(|left, right| {
780        (&left.from, &left.to, &left.relationship_type).cmp(&(
781            &right.from,
782            &right.to,
783            &right.relationship_type,
784        ))
785    });
786    canonical
787}
788
789fn decode_hex_exact<const N: usize>(
790    label: &str,
791    value: &str,
792) -> Result<[u8; N], WorkflowBundleError> {
793    let bytes = hex::decode(value).map_err(|error| {
794        WorkflowBundleError::new(
795            WorkflowBundleErrorKind::InvalidSignature,
796            format!("{label} is not hex: {error}"),
797        )
798    })?;
799    bytes.try_into().map_err(|bytes: Vec<u8>| {
800        WorkflowBundleError::new(
801            WorkflowBundleErrorKind::InvalidSignature,
802            format!("{label} must decode to {N} bytes, got {}", bytes.len()),
803        )
804    })
805}
806
807fn append_harnpack_entry<W: std::io::Write>(
808    builder: &mut tar::Builder<W>,
809    path: &str,
810    bytes: &[u8],
811    mode: u32,
812) -> Result<(), WorkflowBundleError> {
813    let mut header = tar::Header::new_gnu();
814    header.set_path(path).map_err(|error| {
815        WorkflowBundleError::new(
816            WorkflowBundleErrorKind::UnsafeArchivePath,
817            format!("invalid archive path {path}: {error}"),
818        )
819    })?;
820    header.set_entry_type(tar::EntryType::Regular);
821    header.set_size(bytes.len() as u64);
822    header.set_mode(mode);
823    header.set_mtime(0);
824    header.set_uid(0);
825    header.set_gid(0);
826    header.set_cksum();
827    builder.append(&header, bytes)?;
828    Ok(())
829}
830
831fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
832    let mut parts = Vec::new();
833    for component in path.components() {
834        match component {
835            Component::Normal(part) => {
836                let Some(part) = part.to_str() else {
837                    return Err(WorkflowBundleError::new(
838                        WorkflowBundleErrorKind::UnsafeArchivePath,
839                        format!("archive path is not valid UTF-8: {}", path.display()),
840                    ));
841                };
842                if part.is_empty() {
843                    return Err(WorkflowBundleError::new(
844                        WorkflowBundleErrorKind::UnsafeArchivePath,
845                        "archive path contains an empty component",
846                    ));
847                }
848                parts.push(part.to_string());
849            }
850            Component::CurDir => {}
851            Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
852                return Err(WorkflowBundleError::new(
853                    WorkflowBundleErrorKind::UnsafeArchivePath,
854                    format!(
855                        "archive path must be relative and contained: {}",
856                        path.display()
857                    ),
858                ));
859            }
860        }
861    }
862    if parts.is_empty() {
863        return Err(WorkflowBundleError::new(
864            WorkflowBundleErrorKind::UnsafeArchivePath,
865            "archive path is empty",
866        ));
867    }
868    Ok(parts.join("/"))
869}
870
871fn path_sort_key(path: &Path) -> String {
872    path.components()
873        .filter_map(|component| match component {
874            Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
875            _ => None,
876        })
877        .collect::<Vec<_>>()
878        .join("/")
879}
880
881fn blake3_hash_bytes(bytes: &[u8]) -> String {
882    blake3_digest_string(blake3::hash(bytes))
883}
884
885fn blake3_digest_string(hash: blake3::Hash) -> String {
886    format!("blake3:{hash}")
887}
888
889pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
890    let canonical = canonical_workflow_graph(&bundle.workflow);
891    let mut report = WorkflowBundleValidationReport {
892        valid: true,
893        bundle_id: bundle.id.clone(),
894        workflow_id: canonical.id.clone(),
895        graph_digest: workflow_graph_digest(&canonical),
896        errors: Vec::new(),
897        warnings: Vec::new(),
898    };
899
900    validate_manifest_contract(bundle, &mut report);
901    validate_bundle_identity(bundle, &canonical, &mut report);
902    validate_triggers(bundle, &canonical, &mut report);
903    validate_prompt_capsules(bundle, &canonical, &mut report);
904    validate_policy(bundle, &mut report);
905    validate_connectors(bundle, &mut report);
906    validate_environment(bundle, &mut report);
907
908    let graph_report = validate_workflow(&canonical, None);
909    for error in graph_report.errors {
910        let node_id = workflow_diagnostic_node_id(&error, &canonical);
911        push_error(&mut report, "workflow", error, node_id);
912    }
913    for warning in graph_report.warnings {
914        let node_id = workflow_diagnostic_node_id(&warning, &canonical);
915        push_warning(&mut report, "workflow", warning, node_id);
916    }
917
918    if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
919        if expected != report.graph_digest {
920            let actual = report.graph_digest.clone();
921            push_error(
922                &mut report,
923                "receipts.graph_digest",
924                format!("graph digest mismatch: expected {expected}, computed {actual}"),
925                None,
926            );
927        }
928    }
929    if let Some(expected_version) = bundle.receipts.workflow_version {
930        if expected_version != canonical.version {
931            push_error(
932                &mut report,
933                "receipts.workflow_version",
934                format!(
935                    "workflow version mismatch: expected {expected_version}, computed {}",
936                    canonical.version
937                ),
938                None,
939            );
940        }
941    }
942
943    report.valid = report.errors.is_empty();
944    report
945}
946
947pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
948    let canonical = canonical_workflow_graph(&bundle.workflow);
949    let validation = validate_workflow_bundle(bundle);
950    let graph = export_workflow_bundle_graph(bundle, &validation);
951    let mermaid = graph.mermaid.clone();
952    let triggers_by_node = triggers_by_node(bundle);
953    let capsules_by_node = capsules_by_node(bundle);
954    let mut nodes = Vec::new();
955
956    for (node_id, node) in &canonical.nodes {
957        let mut outgoing = canonical
958            .edges
959            .iter()
960            .filter(|edge| edge.from == *node_id)
961            .map(|edge| edge.to.clone())
962            .collect::<Vec<_>>();
963        outgoing.sort();
964        outgoing.dedup();
965        nodes.push(WorkflowBundlePreviewNode {
966            id: node_id.clone(),
967            kind: node.kind.clone(),
968            label: node.task_label.clone(),
969            prompt_capsule: capsules_by_node.get(node_id).cloned(),
970            trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
971            outgoing,
972        });
973    }
974
975    WorkflowBundlePreview {
976        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
977        bundle_id: bundle.id.clone(),
978        bundle_version: bundle.version.clone(),
979        workflow_id: canonical.id.clone(),
980        workflow_version: canonical.version,
981        graph_digest: validation.graph_digest.clone(),
982        validation,
983        graph,
984        mermaid,
985        triggers: bundle.triggers.clone(),
986        connectors: bundle.connectors.clone(),
987        environment: bundle.environment.clone(),
988        nodes,
989        edges: sorted_edges(&canonical),
990    }
991}
992
993pub fn export_workflow_bundle_graph(
994    bundle: &WorkflowBundle,
995    validation: &WorkflowBundleValidationReport,
996) -> WorkflowBundleGraphExport {
997    let canonical = canonical_workflow_graph(&bundle.workflow);
998    let mut nodes = Vec::new();
999    let mut edges = Vec::new();
1000    let mut editable_fields = Vec::new();
1001    let capsules_by_node = capsules_by_node(bundle);
1002    let catchup_enabled = bundle.policy.catchup.mode != "none";
1003    let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1004
1005    for (index, connector) in bundle.connectors.iter().enumerate() {
1006        let node_fields = connector_editable_fields(index, connector);
1007        editable_fields.extend(node_fields.clone());
1008        nodes.push(WorkflowBundleGraphNode {
1009            id: connector_graph_id(&connector.id),
1010            node_type: "connector_call".to_string(),
1011            label: connector_label(connector),
1012            workflow_node_id: None,
1013            trigger_id: None,
1014            connector_id: Some(connector.id.clone()),
1015            editable_fields: node_fields,
1016            metadata: BTreeMap::from([
1017                (
1018                    "provider_id".to_string(),
1019                    serde_json::json!(connector.provider_id),
1020                ),
1021                ("scopes".to_string(), serde_json::json!(connector.scopes)),
1022            ]),
1023        });
1024    }
1025
1026    let catchup_fields = catchup_editable_fields();
1027    let retry_fields = retry_editable_fields();
1028    if catchup_enabled {
1029        let node_fields = catchup_fields.clone();
1030        editable_fields.extend(node_fields.clone());
1031        nodes.push(WorkflowBundleGraphNode {
1032            id: catchup_graph_id(),
1033            node_type: "catchup".to_string(),
1034            label: "Catch up".to_string(),
1035            workflow_node_id: None,
1036            trigger_id: None,
1037            connector_id: None,
1038            editable_fields: node_fields,
1039            metadata: BTreeMap::from([(
1040                "mode".to_string(),
1041                serde_json::json!(bundle.policy.catchup.mode),
1042            )]),
1043        });
1044    } else {
1045        editable_fields.extend(catchup_fields);
1046    }
1047    if retry_can_dlq {
1048        let node_fields = retry_fields.clone();
1049        editable_fields.extend(node_fields.clone());
1050        nodes.push(WorkflowBundleGraphNode {
1051            id: dlq_graph_id(),
1052            node_type: "dlq".to_string(),
1053            label: "Dead letter queue".to_string(),
1054            workflow_node_id: None,
1055            trigger_id: None,
1056            connector_id: None,
1057            editable_fields: node_fields,
1058            metadata: BTreeMap::from([(
1059                "max_attempts".to_string(),
1060                serde_json::json!(bundle.policy.retry.max_attempts),
1061            )]),
1062        });
1063    } else {
1064        editable_fields.extend(retry_fields);
1065    }
1066
1067    for (index, trigger) in bundle.triggers.iter().enumerate() {
1068        let node_fields = trigger_editable_fields(index, trigger);
1069        editable_fields.extend(node_fields.clone());
1070        nodes.push(WorkflowBundleGraphNode {
1071            id: trigger_graph_id(&trigger.id),
1072            node_type: "trigger".to_string(),
1073            label: trigger_label(trigger),
1074            workflow_node_id: trigger.node_id.clone(),
1075            trigger_id: Some(trigger.id.clone()),
1076            connector_id: None,
1077            editable_fields: node_fields,
1078            metadata: BTreeMap::from([
1079                ("kind".to_string(), serde_json::json!(trigger.kind)),
1080                ("provider".to_string(), serde_json::json!(trigger.provider)),
1081                ("events".to_string(), serde_json::json!(trigger.events)),
1082            ]),
1083        });
1084        if let Some(provider) = trigger.provider.as_deref() {
1085            if let Some(connector) = bundle
1086                .connectors
1087                .iter()
1088                .find(|connector| connector.provider_id == provider || connector.id == provider)
1089            {
1090                edges.push(WorkflowBundleGraphEdge {
1091                    from: connector_graph_id(&connector.id),
1092                    to: trigger_graph_id(&trigger.id),
1093                    label: Some("binds".to_string()),
1094                    branch: None,
1095                });
1096            }
1097        }
1098        let target = trigger
1099            .node_id
1100            .clone()
1101            .unwrap_or_else(|| canonical.entry.clone());
1102        if catchup_enabled {
1103            edges.push(WorkflowBundleGraphEdge {
1104                from: trigger_graph_id(&trigger.id),
1105                to: catchup_graph_id(),
1106                label: Some(bundle.policy.catchup.mode.clone()),
1107                branch: Some("catchup".to_string()),
1108            });
1109            edges.push(WorkflowBundleGraphEdge {
1110                from: catchup_graph_id(),
1111                to: workflow_graph_id(&target),
1112                label: Some("dispatch".to_string()),
1113                branch: None,
1114            });
1115        } else {
1116            edges.push(WorkflowBundleGraphEdge {
1117                from: trigger_graph_id(&trigger.id),
1118                to: workflow_graph_id(&target),
1119                label: Some("dispatch".to_string()),
1120                branch: None,
1121            });
1122        }
1123    }
1124
1125    for (node_id, node) in &canonical.nodes {
1126        let capsule_id = capsules_by_node.get(node_id);
1127        let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1128        editable_fields.extend(node_fields.clone());
1129        nodes.push(WorkflowBundleGraphNode {
1130            id: workflow_graph_id(node_id),
1131            node_type: workflow_node_type(&node.kind),
1132            label: workflow_node_label(node_id, node),
1133            workflow_node_id: Some(node_id.clone()),
1134            trigger_id: None,
1135            connector_id: None,
1136            editable_fields: node_fields,
1137            metadata: BTreeMap::from([
1138                ("kind".to_string(), serde_json::json!(node.kind)),
1139                ("task_label".to_string(), serde_json::json!(node.task_label)),
1140                (
1141                    "prompt_capsule".to_string(),
1142                    serde_json::json!(capsule_id.cloned()),
1143                ),
1144            ]),
1145        });
1146    }
1147
1148    for edge in sorted_edges(&canonical) {
1149        edges.push(WorkflowBundleGraphEdge {
1150            from: workflow_graph_id(&edge.from),
1151            to: workflow_graph_id(&edge.to),
1152            label: edge.label.clone(),
1153            branch: edge.branch.clone(),
1154        });
1155    }
1156
1157    let outgoing: BTreeSet<&str> = canonical
1158        .edges
1159        .iter()
1160        .map(|edge| edge.from.as_str())
1161        .collect();
1162    for node_id in canonical.nodes.keys() {
1163        if !outgoing.contains(node_id.as_str()) {
1164            edges.push(WorkflowBundleGraphEdge {
1165                from: workflow_graph_id(node_id),
1166                to: terminal_completed_graph_id(),
1167                label: Some("completed".to_string()),
1168                branch: Some("completed".to_string()),
1169            });
1170        }
1171        if retry_can_dlq {
1172            edges.push(WorkflowBundleGraphEdge {
1173                from: workflow_graph_id(node_id),
1174                to: dlq_graph_id(),
1175                label: Some("retry exhausted".to_string()),
1176                branch: Some("failed".to_string()),
1177            });
1178        }
1179    }
1180
1181    nodes.push(WorkflowBundleGraphNode {
1182        id: terminal_completed_graph_id(),
1183        node_type: "terminal".to_string(),
1184        label: "Completed".to_string(),
1185        workflow_node_id: None,
1186        trigger_id: None,
1187        connector_id: None,
1188        editable_fields: Vec::new(),
1189        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1190    });
1191    nodes.push(WorkflowBundleGraphNode {
1192        id: terminal_failed_graph_id(),
1193        node_type: "terminal".to_string(),
1194        label: "Failed".to_string(),
1195        workflow_node_id: None,
1196        trigger_id: None,
1197        connector_id: None,
1198        editable_fields: Vec::new(),
1199        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1200    });
1201    if retry_can_dlq {
1202        edges.push(WorkflowBundleGraphEdge {
1203            from: dlq_graph_id(),
1204            to: terminal_failed_graph_id(),
1205            label: Some("failed".to_string()),
1206            branch: Some("failed".to_string()),
1207        });
1208    }
1209
1210    nodes.sort_by(|left, right| left.id.cmp(&right.id));
1211    edges.sort_by(|left, right| {
1212        (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1213            &right.from,
1214            &right.to,
1215            &right.branch,
1216            &right.label,
1217        ))
1218    });
1219    editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1220
1221    let diagnostics = validation
1222        .errors
1223        .iter()
1224        .chain(validation.warnings.iter())
1225        .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1226            severity: diagnostic.severity.clone(),
1227            path: diagnostic.path.clone(),
1228            message: diagnostic.message.clone(),
1229            node_id: diagnostic.node_id.clone(),
1230            graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1231        })
1232        .collect::<Vec<_>>();
1233    let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1234
1235    WorkflowBundleGraphExport {
1236        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1237        graph_id: canonical.id,
1238        graph_digest: validation.graph_digest.clone(),
1239        nodes,
1240        edges,
1241        diagnostics,
1242        editable_fields,
1243        mermaid,
1244    }
1245}
1246
1247pub fn run_workflow_bundle(
1248    bundle: &WorkflowBundle,
1249    request: WorkflowBundleRunRequest,
1250) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1251    let validation = validate_workflow_bundle(bundle);
1252    if !validation.valid {
1253        return Err(validation);
1254    }
1255
1256    let canonical = canonical_workflow_graph(&bundle.workflow);
1257    let trigger_id = match request.trigger_id {
1258        Some(trigger_id)
1259            if !bundle
1260                .triggers
1261                .iter()
1262                .any(|trigger| trigger.id == trigger_id) =>
1263        {
1264            let mut report = validation;
1265            push_error(
1266                &mut report,
1267                "trigger_id",
1268                format!("unknown trigger id: {trigger_id}"),
1269                None,
1270            );
1271            report.valid = false;
1272            return Err(report);
1273        }
1274        Some(trigger_id) => Some(trigger_id),
1275        None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1276    };
1277    let mut event_ids = bundle.receipts.event_ids.clone();
1278    if let Some(event_id) = request.event_id {
1279        if !event_ids.contains(&event_id) {
1280            event_ids.push(event_id);
1281        }
1282    }
1283    let run_id = bundle
1284        .receipts
1285        .run_id
1286        .clone()
1287        .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1288    let capsules_by_node = capsules_by_node(bundle);
1289    let executed_nodes = execution_order(&canonical)
1290        .into_iter()
1291        .map(|node_id| {
1292            let node = canonical
1293                .nodes
1294                .get(&node_id)
1295                .expect("execution order only contains known nodes");
1296            WorkflowBundleRunNodeReceipt {
1297                node_id: node_id.clone(),
1298                kind: node.kind.clone(),
1299                prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1300                status: "completed".to_string(),
1301            }
1302        })
1303        .collect();
1304
1305    Ok(WorkflowBundleRunReceipt {
1306        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1307        receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1308        bundle_id: bundle.id.clone(),
1309        bundle_version: bundle.version.clone(),
1310        workflow_id: canonical.id,
1311        workflow_version: canonical.version,
1312        graph_digest: validation.graph_digest,
1313        run_id,
1314        trigger_id,
1315        event_ids,
1316        status: "completed".to_string(),
1317        executed_nodes,
1318        policy: bundle.policy.clone(),
1319        connectors: bundle.connectors.clone(),
1320        environment: bundle.environment.clone(),
1321    })
1322}
1323
1324fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1325    let mut canonical = graph.clone();
1326    if canonical.type_name.is_empty() {
1327        canonical.type_name = "workflow_graph".to_string();
1328    }
1329    if canonical.version == 0 {
1330        canonical.version = 1;
1331    }
1332    if canonical.entry.is_empty() {
1333        canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1334    }
1335    for (node_id, node) in &mut canonical.nodes {
1336        if node.id.is_none() {
1337            node.id = Some(node_id.clone());
1338        }
1339        if node.kind.is_empty() {
1340            node.kind = "stage".to_string();
1341        }
1342        if node.retry_policy.max_attempts == 0 {
1343            node.retry_policy.max_attempts = 1;
1344        }
1345    }
1346    canonical.edges = sorted_edges(&canonical);
1347    canonical
1348}
1349
1350fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1351    let mut edges = graph.edges.clone();
1352    edges.sort_by(|left, right| {
1353        (
1354            &left.from,
1355            &left.to,
1356            left.branch.as_deref(),
1357            left.label.as_deref(),
1358        )
1359            .cmp(&(
1360                &right.from,
1361                &right.to,
1362                right.branch.as_deref(),
1363                right.label.as_deref(),
1364            ))
1365    });
1366    edges
1367}
1368
1369fn validate_bundle_identity(
1370    bundle: &WorkflowBundle,
1371    graph: &WorkflowGraph,
1372    report: &mut WorkflowBundleValidationReport,
1373) {
1374    if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1375        push_error(
1376            report,
1377            "schema_version",
1378            format!(
1379                "unsupported schema_version {}; expected {}",
1380                bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1381            ),
1382            None,
1383        );
1384    }
1385    if bundle.id.trim().is_empty() {
1386        push_error(report, "id", "bundle id is required", None);
1387    }
1388    if bundle.version.trim().is_empty() {
1389        push_error(report, "version", "bundle version is required", None);
1390    }
1391    if graph.id.trim().is_empty() {
1392        push_error(
1393            report,
1394            "workflow.id",
1395            "workflow id is required for portable bundles",
1396            None,
1397        );
1398    }
1399    if graph.nodes.is_empty() {
1400        push_error(
1401            report,
1402            "workflow.nodes",
1403            "workflow must contain nodes",
1404            None,
1405        );
1406    }
1407    for (node_id, node) in &graph.nodes {
1408        if node_id.trim().is_empty() {
1409            push_error(report, "workflow.nodes", "node id is required", None);
1410        }
1411        if node.id.as_deref().is_some_and(|id| id != node_id) {
1412            push_error(
1413                report,
1414                format!("workflow.nodes.{node_id}.id"),
1415                "node id field must match its map key",
1416                Some(node_id.clone()),
1417            );
1418        }
1419    }
1420}
1421
1422fn validate_manifest_contract(
1423    bundle: &WorkflowBundle,
1424    report: &mut WorkflowBundleValidationReport,
1425) {
1426    validate_relative_path(
1427        report,
1428        "entrypoint",
1429        &bundle.entrypoint,
1430        "entrypoint is required",
1431    );
1432    if bundle.transitive_modules.is_empty() {
1433        push_error(
1434            report,
1435            "transitive_modules",
1436            "at least one transitive module entry is required",
1437            None,
1438        );
1439    }
1440    let mut module_paths = BTreeSet::new();
1441    for (index, module) in bundle.transitive_modules.iter().enumerate() {
1442        let path = format!("transitive_modules[{index}]");
1443        validate_relative_path(
1444            report,
1445            format!("{path}.path"),
1446            &module.path,
1447            "module path is required",
1448        );
1449        if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1450        {
1451            push_error(
1452                report,
1453                format!("{path}.path"),
1454                format!(
1455                    "duplicate transitive module path: {}",
1456                    module.path.display()
1457                ),
1458                None,
1459            );
1460        }
1461        validate_blake3_hash(
1462            report,
1463            format!("{path}.source_hash_blake3"),
1464            &module.source_hash_blake3,
1465            true,
1466        );
1467        validate_blake3_hash(
1468            report,
1469            format!("{path}.harnbc_hash_blake3"),
1470            &module.harnbc_hash_blake3,
1471            true,
1472        );
1473    }
1474    if bundle.stdlib_version.trim().is_empty() {
1475        push_error(report, "stdlib_version", "stdlib_version is required", None);
1476    }
1477    if bundle.harn_version.trim().is_empty() {
1478        push_error(report, "harn_version", "harn_version is required", None);
1479    }
1480    validate_blake3_hash(
1481        report,
1482        "provider_catalog_hash",
1483        &bundle.provider_catalog_hash,
1484        true,
1485    );
1486
1487    let mut tool_names = BTreeSet::new();
1488    for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1489        let path = format!("tool_manifest[{index}]");
1490        if tool.name.trim().is_empty() {
1491            push_error(
1492                report,
1493                format!("{path}.name"),
1494                "tool manifest entry name is required",
1495                None,
1496            );
1497        } else if !tool_names.insert(tool.name.clone()) {
1498            push_error(
1499                report,
1500                format!("{path}.name"),
1501                format!("duplicate tool manifest entry: {}", tool.name),
1502                None,
1503            );
1504        }
1505        if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1506            validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1507        }
1508    }
1509
1510    if bundle.sbom.format.trim().is_empty() {
1511        push_error(report, "sbom.format", "SBOM format is required", None);
1512    }
1513    if bundle.sbom.version.trim().is_empty() {
1514        push_error(report, "sbom.version", "SBOM version is required", None);
1515    }
1516    let mut sbom_packages = BTreeSet::new();
1517    for (index, package) in bundle.sbom.packages.iter().enumerate() {
1518        let path = format!("sbom.packages[{index}]");
1519        if package.name.trim().is_empty() {
1520            push_error(
1521                report,
1522                format!("{path}.name"),
1523                "SBOM package name is required",
1524                None,
1525            );
1526        } else if !sbom_packages.insert(package.name.clone()) {
1527            push_error(
1528                report,
1529                format!("{path}.name"),
1530                format!("duplicate SBOM package: {}", package.name),
1531                None,
1532            );
1533        }
1534        if let Some(hash) = package.package_hash_blake3.as_deref() {
1535            validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1536        }
1537    }
1538    for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1539        let path = format!("sbom.relationships[{index}]");
1540        if relationship.from.trim().is_empty() {
1541            push_error(
1542                report,
1543                format!("{path}.from"),
1544                "SBOM relationship source is required",
1545                None,
1546            );
1547        }
1548        if relationship.to.trim().is_empty() {
1549            push_error(
1550                report,
1551                format!("{path}.to"),
1552                "SBOM relationship target is required",
1553                None,
1554            );
1555        }
1556        if relationship.relationship_type.trim().is_empty() {
1557            push_error(
1558                report,
1559                format!("{path}.relationship_type"),
1560                "SBOM relationship type is required",
1561                None,
1562            );
1563        }
1564    }
1565
1566    if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1567        if parent_id.trim().is_empty() {
1568            push_error(
1569                report,
1570                "parent_trust_record_id",
1571                "parent_trust_record_id cannot be empty when present",
1572                None,
1573            );
1574        }
1575    }
1576    if let Some(signature) = &bundle.signature {
1577        if signature.algorithm.trim().is_empty() {
1578            push_error(
1579                report,
1580                "signature.algorithm",
1581                "signature algorithm is required",
1582                None,
1583            );
1584        } else if signature.algorithm != "ed25519" {
1585            push_error(
1586                report,
1587                "signature.algorithm",
1588                "signature algorithm must be ed25519",
1589                None,
1590            );
1591        }
1592        if signature.public_key.trim().is_empty() {
1593            push_error(
1594                report,
1595                "signature.public_key",
1596                "signature public_key is required",
1597                None,
1598            );
1599        }
1600        if signature.signature.trim().is_empty() {
1601            push_error(
1602                report,
1603                "signature.signature",
1604                "signature value is required",
1605                None,
1606            );
1607        }
1608        validate_blake3_hash(
1609            report,
1610            "signature.manifest_hash_blake3",
1611            &signature.manifest_hash_blake3,
1612            true,
1613        );
1614    }
1615}
1616
1617fn validate_relative_path(
1618    report: &mut WorkflowBundleValidationReport,
1619    path: impl Into<String>,
1620    value: &Path,
1621    empty_message: &str,
1622) {
1623    let path = path.into();
1624    if value.as_os_str().is_empty() {
1625        push_error(report, path, empty_message, None);
1626        return;
1627    }
1628    if normalize_archive_path(value).is_err() {
1629        push_error(
1630            report,
1631            path,
1632            format!("path must be relative and contained: {}", value.display()),
1633            None,
1634        );
1635    }
1636}
1637
1638fn validate_blake3_hash(
1639    report: &mut WorkflowBundleValidationReport,
1640    path: impl Into<String>,
1641    value: &str,
1642    required: bool,
1643) {
1644    let path = path.into();
1645    if value.trim().is_empty() {
1646        if required {
1647            push_error(report, path, "BLAKE3 hash is required", None);
1648        }
1649        return;
1650    }
1651    let Some(hex) = value.strip_prefix("blake3:") else {
1652        push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1653        return;
1654    };
1655    if hex.len() != 64
1656        || !hex
1657            .bytes()
1658            .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1659    {
1660        push_error(
1661            report,
1662            path,
1663            "BLAKE3 hash must contain 64 lowercase hex digits",
1664            None,
1665        );
1666    }
1667}
1668
1669fn validate_triggers(
1670    bundle: &WorkflowBundle,
1671    graph: &WorkflowGraph,
1672    report: &mut WorkflowBundleValidationReport,
1673) {
1674    if bundle.triggers.is_empty() {
1675        push_warning(report, "triggers", "bundle declares no triggers", None);
1676    }
1677    let mut ids = BTreeSet::new();
1678    for (index, trigger) in bundle.triggers.iter().enumerate() {
1679        let path = format!("triggers[{index}]");
1680        if trigger.id.trim().is_empty() {
1681            push_error(report, format!("{path}.id"), "trigger id is required", None);
1682        } else if !ids.insert(trigger.id.clone()) {
1683            push_error(
1684                report,
1685                format!("{path}.id"),
1686                format!("duplicate trigger id: {}", trigger.id),
1687                None,
1688            );
1689        }
1690        match trigger.kind.as_str() {
1691            "github" => {
1692                if trigger.provider.as_deref() != Some("github") {
1693                    push_error(
1694                        report,
1695                        format!("{path}.provider"),
1696                        "github triggers require provider=\"github\"",
1697                        None,
1698                    );
1699                }
1700                if trigger.events.is_empty() {
1701                    push_error(
1702                        report,
1703                        format!("{path}.events"),
1704                        "github triggers require at least one event",
1705                        None,
1706                    );
1707                }
1708            }
1709            "cron" if trigger.schedule.is_none() => push_error(
1710                report,
1711                format!("{path}.schedule"),
1712                "cron triggers require schedule",
1713                None,
1714            ),
1715            "delay" if trigger.delay.is_none() => push_error(
1716                report,
1717                format!("{path}.delay"),
1718                "delay triggers require delay",
1719                None,
1720            ),
1721            "webhook" if trigger.webhook_path.is_none() => push_error(
1722                report,
1723                format!("{path}.webhook_path"),
1724                "webhook triggers require webhook_path",
1725                None,
1726            ),
1727            "mcp" if trigger.mcp_tool.is_none() => push_error(
1728                report,
1729                format!("{path}.mcp_tool"),
1730                "mcp triggers require mcp_tool",
1731                None,
1732            ),
1733            "manual" => {}
1734            "" => push_error(
1735                report,
1736                format!("{path}.kind"),
1737                "trigger kind is required",
1738                None,
1739            ),
1740            other
1741                if !matches!(
1742                    other,
1743                    "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1744                ) =>
1745            {
1746                push_error(
1747                    report,
1748                    format!("{path}.kind"),
1749                    format!("unsupported trigger kind: {other}"),
1750                    None,
1751                );
1752            }
1753            _ => {}
1754        }
1755        if let Some(node_id) = trigger.node_id.as_deref() {
1756            if !graph.nodes.contains_key(node_id) {
1757                push_error(
1758                    report,
1759                    format!("{path}.node_id"),
1760                    format!("trigger references unknown node: {node_id}"),
1761                    Some(node_id.to_string()),
1762                );
1763            }
1764        }
1765    }
1766}
1767
1768fn validate_prompt_capsules(
1769    bundle: &WorkflowBundle,
1770    graph: &WorkflowGraph,
1771    report: &mut WorkflowBundleValidationReport,
1772) {
1773    let trigger_ids: BTreeSet<&str> = bundle
1774        .triggers
1775        .iter()
1776        .map(|trigger| trigger.id.as_str())
1777        .collect();
1778    let mut node_refs = BTreeSet::new();
1779    for (key, capsule) in &bundle.prompt_capsules {
1780        let path = format!("prompt_capsules.{key}");
1781        if capsule.id.trim().is_empty() {
1782            push_error(
1783                report,
1784                format!("{path}.id"),
1785                "prompt capsule id is required",
1786                None,
1787            );
1788        } else if capsule.id != *key {
1789            push_error(
1790                report,
1791                format!("{path}.id"),
1792                "prompt capsule id must match its map key",
1793                None,
1794            );
1795        }
1796        if capsule.prompt.trim().is_empty() {
1797            push_error(
1798                report,
1799                format!("{path}.prompt"),
1800                "prompt capsule prompt is required",
1801                Some(capsule.node_id.clone()),
1802            );
1803        }
1804        if !graph.nodes.contains_key(&capsule.node_id) {
1805            push_error(
1806                report,
1807                format!("{path}.node_id"),
1808                format!(
1809                    "prompt capsule references unknown node: {}",
1810                    capsule.node_id
1811                ),
1812                Some(capsule.node_id.clone()),
1813            );
1814        }
1815        if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1816            push_error(
1817                report,
1818                format!("{path}.node_id"),
1819                format!("multiple prompt capsules target node {}", capsule.node_id),
1820                Some(capsule.node_id.clone()),
1821            );
1822        }
1823        if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1824            if !trigger_ids.contains(trigger_id) {
1825                push_error(
1826                    report,
1827                    format!("{path}.trigger_id"),
1828                    format!("prompt capsule references unknown trigger: {trigger_id}"),
1829                    Some(capsule.node_id.clone()),
1830                );
1831            }
1832        }
1833    }
1834}
1835
1836fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1837    if !matches!(
1838        bundle.policy.autonomy_tier.as_str(),
1839        "shadow" | "suggest" | "act_with_approval" | "act_auto"
1840    ) {
1841        push_error(
1842            report,
1843            "policy.autonomy_tier",
1844            "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1845            None,
1846        );
1847    }
1848    if bundle.policy.retry.max_attempts == 0 {
1849        push_error(
1850            report,
1851            "policy.retry.max_attempts",
1852            "retry.max_attempts must be at least 1",
1853            None,
1854        );
1855    }
1856    if !matches!(
1857        bundle.policy.catchup.mode.as_str(),
1858        "none" | "latest" | "all"
1859    ) {
1860        push_error(
1861            report,
1862            "policy.catchup.mode",
1863            "catchup.mode must be none, latest, or all",
1864            None,
1865        );
1866    }
1867}
1868
1869fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1870    let mut ids = BTreeSet::new();
1871    let provider_ids: BTreeSet<&str> = bundle
1872        .connectors
1873        .iter()
1874        .map(|connector| connector.provider_id.as_str())
1875        .collect();
1876    for (index, connector) in bundle.connectors.iter().enumerate() {
1877        let path = format!("connectors[{index}]");
1878        if connector.id.trim().is_empty() {
1879            push_error(
1880                report,
1881                format!("{path}.id"),
1882                "connector id is required",
1883                None,
1884            );
1885        } else if !ids.insert(connector.id.clone()) {
1886            push_error(
1887                report,
1888                format!("{path}.id"),
1889                format!("duplicate connector id: {}", connector.id),
1890                None,
1891            );
1892        }
1893        if connector.provider_id.trim().is_empty() {
1894            push_error(
1895                report,
1896                format!("{path}.provider_id"),
1897                "connector provider_id is required",
1898                None,
1899            );
1900        }
1901    }
1902    for trigger in &bundle.triggers {
1903        if let Some(provider) = trigger.provider.as_deref() {
1904            if !provider_ids.contains(provider) {
1905                push_warning(
1906                    report,
1907                    "connectors",
1908                    format!(
1909                        "trigger {} references provider {provider} with no connector requirement",
1910                        trigger.id
1911                    ),
1912                    trigger.node_id.clone(),
1913                );
1914            }
1915        }
1916    }
1917}
1918
1919fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1920    if !matches!(
1921        bundle.environment.worktree_policy.as_str(),
1922        "reuse_current" | "new_worktree" | "host_managed"
1923    ) {
1924        push_error(
1925            report,
1926            "environment.worktree_policy",
1927            "worktree_policy must be reuse_current, new_worktree, or host_managed",
1928            None,
1929        );
1930    }
1931}
1932
1933fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1934    for prefix in [
1935        "node is unreachable: ",
1936        "edge.from references unknown node: ",
1937        "edge.to references unknown node: ",
1938        "entry node does not exist: ",
1939    ] {
1940        if let Some(node_id) = message.strip_prefix(prefix) {
1941            return Some(node_id.to_string());
1942        }
1943    }
1944    if let Some(rest) = message.strip_prefix("node ") {
1945        if let Some((node_id, _)) = rest.split_once(':') {
1946            return Some(node_id.to_string());
1947        }
1948    }
1949    graph
1950        .nodes
1951        .keys()
1952        .find(|node_id| message.contains(&format!("node {node_id}:")))
1953        .cloned()
1954}
1955
1956fn workflow_graph_id(node_id: &str) -> String {
1957    format!("node/{node_id}")
1958}
1959
1960fn trigger_graph_id(trigger_id: &str) -> String {
1961    format!("trigger/{trigger_id}")
1962}
1963
1964fn connector_graph_id(connector_id: &str) -> String {
1965    format!("connector/{connector_id}")
1966}
1967
1968fn catchup_graph_id() -> String {
1969    "policy/catchup".to_string()
1970}
1971
1972fn dlq_graph_id() -> String {
1973    "policy/dlq".to_string()
1974}
1975
1976fn terminal_completed_graph_id() -> String {
1977    "terminal/completed".to_string()
1978}
1979
1980fn terminal_failed_graph_id() -> String {
1981    "terminal/failed".to_string()
1982}
1983
1984fn workflow_node_type(kind: &str) -> String {
1985    match kind {
1986        "action" => "action",
1987        "stage" | "agent" => "agent",
1988        "subagent" | "worker" => "subagent",
1989        "wait" | "waitpoint" | "delay" => "wait",
1990        "approval" | "hitl" => "approval",
1991        "connector" | "connector_call" => "connector_call",
1992        "notification" | "notify" => "notification",
1993        "terminal" | "success" | "failure" => "terminal",
1994        other if other.trim().is_empty() => "agent",
1995        other => other,
1996    }
1997    .to_string()
1998}
1999
2000fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2001    node.task_label
2002        .clone()
2003        .or_else(|| node.prompt.clone())
2004        .map(|label| label.trim().to_string())
2005        .filter(|label| !label.is_empty())
2006        .unwrap_or_else(|| node_id.to_string())
2007}
2008
2009fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2010    if !trigger.events.is_empty() {
2011        format!("{}: {}", trigger.kind, trigger.events.join(", "))
2012    } else if let Some(schedule) = trigger.schedule.as_deref() {
2013        format!("cron: {schedule}")
2014    } else if let Some(delay) = trigger.delay.as_deref() {
2015        format!("delay: {delay}")
2016    } else {
2017        trigger.id.clone()
2018    }
2019}
2020
2021fn connector_label(connector: &ConnectorRequirement) -> String {
2022    if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2023        connector.id.clone()
2024    } else {
2025        format!("{} ({})", connector.id, connector.provider_id)
2026    }
2027}
2028
2029fn editable_field(
2030    id: impl Into<String>,
2031    label: impl Into<String>,
2032    json_pointer: impl Into<String>,
2033    value_type: impl Into<String>,
2034    required: bool,
2035    enum_values: &[&str],
2036) -> WorkflowBundleEditableField {
2037    WorkflowBundleEditableField {
2038        id: id.into(),
2039        label: label.into(),
2040        json_pointer: json_pointer.into(),
2041        value_type: value_type.into(),
2042        required,
2043        enum_values: enum_values
2044            .iter()
2045            .map(|value| (*value).to_string())
2046            .collect(),
2047    }
2048}
2049
2050fn json_pointer_segment(value: &str) -> String {
2051    value.replace('~', "~0").replace('/', "~1")
2052}
2053
2054fn trigger_editable_fields(
2055    index: usize,
2056    trigger: &WorkflowBundleTrigger,
2057) -> Vec<WorkflowBundleEditableField> {
2058    let base = format!("/triggers/{index}");
2059    let mut fields = vec![
2060        editable_field(
2061            format!("trigger.{}.kind", trigger.id),
2062            "Trigger kind",
2063            format!("{base}/kind"),
2064            "enum",
2065            true,
2066            &["github", "cron", "delay", "manual", "webhook", "mcp"],
2067        ),
2068        editable_field(
2069            format!("trigger.{}.node_id", trigger.id),
2070            "Target node",
2071            format!("{base}/node_id"),
2072            "string",
2073            false,
2074            &[],
2075        ),
2076    ];
2077    if trigger.provider.is_some() || trigger.kind == "github" {
2078        fields.push(editable_field(
2079            format!("trigger.{}.provider", trigger.id),
2080            "Provider",
2081            format!("{base}/provider"),
2082            "string",
2083            trigger.kind == "github",
2084            &[],
2085        ));
2086    }
2087    for (field, label, value_type) in [
2088        ("events", "Events", "list"),
2089        ("schedule", "Schedule", "string"),
2090        ("delay", "Delay", "string"),
2091        ("webhook_path", "Webhook path", "string"),
2092        ("mcp_tool", "MCP tool", "string"),
2093        ("resume_key", "Resume key", "string"),
2094        ("metadata", "Metadata", "object"),
2095    ] {
2096        fields.push(editable_field(
2097            format!("trigger.{}.{}", trigger.id, field),
2098            label,
2099            format!("{base}/{field}"),
2100            value_type,
2101            false,
2102            &[],
2103        ));
2104    }
2105    fields
2106}
2107
2108fn workflow_node_editable_fields(
2109    node_id: &str,
2110    capsule_id: Option<&String>,
2111) -> Vec<WorkflowBundleEditableField> {
2112    let escaped_node = json_pointer_segment(node_id);
2113    let mut fields = vec![
2114        editable_field(
2115            format!("workflow.{node_id}.task_label"),
2116            "Task label",
2117            format!("/workflow/nodes/{escaped_node}/task_label"),
2118            "string",
2119            false,
2120            &[],
2121        ),
2122        editable_field(
2123            format!("workflow.{node_id}.prompt"),
2124            "Prompt",
2125            format!("/workflow/nodes/{escaped_node}/prompt"),
2126            "string",
2127            false,
2128            &[],
2129        ),
2130        editable_field(
2131            format!("workflow.{node_id}.system"),
2132            "System prompt",
2133            format!("/workflow/nodes/{escaped_node}/system"),
2134            "string",
2135            false,
2136            &[],
2137        ),
2138        editable_field(
2139            format!("workflow.{node_id}.model_policy"),
2140            "Model policy",
2141            format!("/workflow/nodes/{escaped_node}/model_policy"),
2142            "object",
2143            false,
2144            &[],
2145        ),
2146        editable_field(
2147            format!("workflow.{node_id}.tools"),
2148            "Tool policy",
2149            format!("/workflow/nodes/{escaped_node}/tools"),
2150            "any",
2151            false,
2152            &[],
2153        ),
2154        editable_field(
2155            format!("workflow.{node_id}.capability_policy"),
2156            "Capability policy",
2157            format!("/workflow/nodes/{escaped_node}/capability_policy"),
2158            "object",
2159            false,
2160            &[],
2161        ),
2162        editable_field(
2163            format!("workflow.{node_id}.approval_policy"),
2164            "Approval policy",
2165            format!("/workflow/nodes/{escaped_node}/approval_policy"),
2166            "object",
2167            false,
2168            &[],
2169        ),
2170        editable_field(
2171            format!("workflow.{node_id}.retry_policy"),
2172            "Retry policy",
2173            format!("/workflow/nodes/{escaped_node}/retry_policy"),
2174            "object",
2175            false,
2176            &[],
2177        ),
2178    ];
2179    if let Some(capsule_id) = capsule_id {
2180        let escaped_capsule = json_pointer_segment(capsule_id);
2181        fields.extend([
2182            editable_field(
2183                format!("prompt_capsule.{capsule_id}.prompt"),
2184                "Prompt capsule",
2185                format!("/prompt_capsules/{escaped_capsule}/prompt"),
2186                "string",
2187                true,
2188                &[],
2189            ),
2190            editable_field(
2191                format!("prompt_capsule.{capsule_id}.system"),
2192                "Prompt capsule system",
2193                format!("/prompt_capsules/{escaped_capsule}/system"),
2194                "string",
2195                false,
2196                &[],
2197            ),
2198            editable_field(
2199                format!("prompt_capsule.{capsule_id}.context"),
2200                "Prompt capsule context",
2201                format!("/prompt_capsules/{escaped_capsule}/context"),
2202                "object",
2203                false,
2204                &[],
2205            ),
2206            editable_field(
2207                format!("prompt_capsule.{capsule_id}.trigger_id"),
2208                "Prompt capsule trigger",
2209                format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2210                "string",
2211                false,
2212                &[],
2213            ),
2214        ]);
2215    }
2216    fields
2217}
2218
2219fn connector_editable_fields(
2220    index: usize,
2221    connector: &ConnectorRequirement,
2222) -> Vec<WorkflowBundleEditableField> {
2223    let base = format!("/connectors/{index}");
2224    [
2225        ("id", "Connector id", "string", true),
2226        ("provider_id", "Provider id", "string", true),
2227        ("scopes", "Scopes", "list", false),
2228        ("setup_required", "Setup required", "bool", false),
2229        ("status_required", "Status required", "bool", false),
2230    ]
2231    .into_iter()
2232    .map(|(field, label, value_type, required)| {
2233        editable_field(
2234            format!("connector.{}.{}", connector.id, field),
2235            label,
2236            format!("{base}/{field}"),
2237            value_type,
2238            required,
2239            &[],
2240        )
2241    })
2242    .collect()
2243}
2244
2245fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2246    vec![
2247        editable_field(
2248            "policy.retry.max_attempts",
2249            "Retry attempts",
2250            "/policy/retry/max_attempts",
2251            "integer",
2252            true,
2253            &[],
2254        ),
2255        editable_field(
2256            "policy.retry.backoff",
2257            "Retry backoff",
2258            "/policy/retry/backoff",
2259            "string",
2260            true,
2261            &[],
2262        ),
2263    ]
2264}
2265
2266fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2267    vec![
2268        editable_field(
2269            "policy.catchup.mode",
2270            "Catchup mode",
2271            "/policy/catchup/mode",
2272            "enum",
2273            true,
2274            &["none", "latest", "all"],
2275        ),
2276        editable_field(
2277            "policy.catchup.max_events",
2278            "Catchup max events",
2279            "/policy/catchup/max_events",
2280            "integer",
2281            false,
2282            &[],
2283        ),
2284    ]
2285}
2286
2287fn render_workflow_bundle_mermaid(
2288    nodes: &[WorkflowBundleGraphNode],
2289    edges: &[WorkflowBundleGraphEdge],
2290) -> String {
2291    let mut lines = vec!["flowchart TD".to_string()];
2292    for node in nodes {
2293        lines.push(format!(
2294            "  {}[\"{}\"]",
2295            mermaid_id(&node.id),
2296            mermaid_label(&format!("{}: {}", node.node_type, node.label))
2297        ));
2298    }
2299    for edge in edges {
2300        let label = edge
2301            .label
2302            .as_deref()
2303            .or(edge.branch.as_deref())
2304            .map(mermaid_label);
2305        match label {
2306            Some(label) if !label.is_empty() => lines.push(format!(
2307                "  {} -->|{}| {}",
2308                mermaid_id(&edge.from),
2309                label,
2310                mermaid_id(&edge.to)
2311            )),
2312            _ => lines.push(format!(
2313                "  {} --> {}",
2314                mermaid_id(&edge.from),
2315                mermaid_id(&edge.to)
2316            )),
2317        }
2318    }
2319    lines.join("\n")
2320}
2321
2322fn mermaid_id(value: &str) -> String {
2323    let digest = Sha256::digest(value.as_bytes());
2324    let suffix = digest
2325        .iter()
2326        .take(4)
2327        .map(|byte| format!("{byte:02x}"))
2328        .collect::<String>();
2329    let mut out = format!("n_{suffix}_");
2330    for ch in value.chars() {
2331        if ch.is_ascii_alphanumeric() {
2332            out.push(ch);
2333        } else {
2334            out.push('_');
2335        }
2336    }
2337    out
2338}
2339
2340fn mermaid_label(value: &str) -> String {
2341    value
2342        .replace('\\', "\\\\")
2343        .replace('"', "\\\"")
2344        .replace('\n', " ")
2345}
2346
2347fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2348    let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2349    for trigger in &bundle.triggers {
2350        if let Some(node_id) = trigger.node_id.as_ref() {
2351            by_node
2352                .entry(node_id.clone())
2353                .or_default()
2354                .push(trigger.id.clone());
2355        }
2356    }
2357    by_node
2358}
2359
2360fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2361    bundle
2362        .prompt_capsules
2363        .iter()
2364        .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2365        .collect()
2366}
2367
2368fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2369    let outgoing =
2370        graph
2371            .edges
2372            .iter()
2373            .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2374                acc.entry(edge.from.clone())
2375                    .or_default()
2376                    .push(edge.to.clone());
2377                acc
2378            });
2379    let mut seen = BTreeSet::new();
2380    let mut queue = VecDeque::from([graph.entry.clone()]);
2381    let mut order = Vec::new();
2382    while let Some(node_id) = queue.pop_front() {
2383        if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2384            continue;
2385        }
2386        order.push(node_id.clone());
2387        if let Some(next) = outgoing.get(&node_id) {
2388            let mut next = next.clone();
2389            next.sort();
2390            for child in next {
2391                queue.push_back(child);
2392            }
2393        }
2394    }
2395    order
2396}
2397
2398fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2399    let suffix = graph_digest
2400        .strip_prefix("sha256:")
2401        .unwrap_or(graph_digest)
2402        .chars()
2403        .take(12)
2404        .collect::<String>();
2405    format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2406}
2407
2408fn sanitize_id(value: &str) -> String {
2409    value
2410        .chars()
2411        .map(|ch| {
2412            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2413                ch
2414            } else {
2415                '_'
2416            }
2417        })
2418        .collect()
2419}
2420
2421fn push_error(
2422    report: &mut WorkflowBundleValidationReport,
2423    path: impl Into<String>,
2424    message: impl Into<String>,
2425    node_id: Option<String>,
2426) {
2427    report.errors.push(WorkflowBundleDiagnostic {
2428        severity: "error".to_string(),
2429        path: path.into(),
2430        message: message.into(),
2431        node_id,
2432    });
2433}
2434
2435fn push_warning(
2436    report: &mut WorkflowBundleValidationReport,
2437    path: impl Into<String>,
2438    message: impl Into<String>,
2439    node_id: Option<String>,
2440) {
2441    report.warnings.push(WorkflowBundleDiagnostic {
2442        severity: "warning".to_string(),
2443        path: path.into(),
2444        message: message.into(),
2445        node_id,
2446    });
2447}
2448
2449#[cfg(test)]
2450#[path = "workflow_bundle_tests.rs"]
2451mod workflow_bundle_tests;