Skip to main content

harn_vm/orchestration/workflow_bundle/
mod.rs

1//! Portable workflow bundle contract, `.harnpack` container helpers, and
2//! deterministic local receipts.
3
4use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22/// Maximum decompressed size of a `.harnpack` archive. Matches the
23/// VM-level decompression cap in `stdlib/compression.rs`. Keeps a
24/// malicious bundle from exhausting memory during ingest (workflow
25/// bundles ride this exact path when harn-cloud accepts a plan
26/// transfer).
27const MAX_HARNPACK_DECOMPRESSED_BYTES: u64 = 100 * 1024 * 1024;
28
29/// Decompress zstd-compressed harnpack bytes, refusing to produce more
30/// than [`MAX_HARNPACK_DECOMPRESSED_BYTES`] of output. Returns
31/// [`WorkflowBundleErrorKind::InvalidArchive`] on overflow so callers
32/// can surface a clean rejection instead of OOMing the host.
33fn decompress_harnpack_zstd(bytes: &[u8]) -> Result<Vec<u8>, WorkflowBundleError> {
34    let mut decoder = zstd::stream::Decoder::new(Cursor::new(bytes))?;
35    let mut output = Vec::new();
36    let mut limited = (&mut decoder).take(MAX_HARNPACK_DECOMPRESSED_BYTES.saturating_add(1));
37    limited.read_to_end(&mut output)?;
38    if output.len() as u64 > MAX_HARNPACK_DECOMPRESSED_BYTES {
39        return Err(WorkflowBundleError::new(
40            WorkflowBundleErrorKind::InvalidArchive,
41            format!(
42                "harnpack zstd payload decompressed past max size ({MAX_HARNPACK_DECOMPRESSED_BYTES} bytes); refusing to extract"
43            ),
44        ));
45    }
46    Ok(output)
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct WorkflowBundle {
52    pub schema_version: u32,
53    pub entrypoint: PathBuf,
54    pub transitive_modules: Vec<ModuleEntry>,
55    pub stdlib_version: String,
56    pub harn_version: String,
57    pub provider_catalog_hash: String,
58    pub tool_manifest: Vec<ToolEntry>,
59    pub sbom: SBOMDoc,
60    pub signature: Option<Ed25519Signature>,
61    pub parent_trust_record_id: Option<String>,
62    pub id: String,
63    pub name: Option<String>,
64    pub version: String,
65    pub triggers: Vec<WorkflowBundleTrigger>,
66    pub workflow: WorkflowGraph,
67    pub prompt_capsules: BTreeMap<String, PromptCapsule>,
68    pub policy: WorkflowBundlePolicy,
69    pub connectors: Vec<ConnectorRequirement>,
70    pub environment: EnvironmentRequirements,
71    pub receipts: WorkflowBundleReplayMetadata,
72    pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75impl Default for WorkflowBundle {
76    fn default() -> Self {
77        Self {
78            schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
79            entrypoint: PathBuf::new(),
80            transitive_modules: Vec::new(),
81            stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
82            harn_version: env!("CARGO_PKG_VERSION").to_string(),
83            provider_catalog_hash: String::new(),
84            tool_manifest: Vec::new(),
85            sbom: SBOMDoc::default(),
86            signature: None,
87            parent_trust_record_id: None,
88            id: String::new(),
89            name: None,
90            version: String::new(),
91            triggers: Vec::new(),
92            workflow: WorkflowGraph::default(),
93            prompt_capsules: BTreeMap::new(),
94            policy: WorkflowBundlePolicy::default(),
95            connectors: Vec::new(),
96            environment: EnvironmentRequirements::default(),
97            receipts: WorkflowBundleReplayMetadata::default(),
98            metadata: BTreeMap::new(),
99        }
100    }
101}
102
103#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(default)]
105pub struct ModuleEntry {
106    pub path: PathBuf,
107    pub source_hash_blake3: String,
108    pub harnbc_hash_blake3: String,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ToolEntry {
114    pub name: String,
115    pub provider: Option<String>,
116    pub annotations: Option<ToolAnnotations>,
117    pub schema_hash_blake3: Option<String>,
118    pub metadata: BTreeMap<String, String>,
119}
120
121#[allow(clippy::upper_case_acronyms)]
122#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
123#[serde(default)]
124pub struct SBOMDoc {
125    pub format: String,
126    pub version: String,
127    pub packages: Vec<SBOMPackage>,
128    pub relationships: Vec<SBOMRelationship>,
129}
130
131#[allow(clippy::upper_case_acronyms)]
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct SBOMPackage {
135    pub name: String,
136    pub version: Option<String>,
137    pub package_hash_blake3: Option<String>,
138    pub license: Option<String>,
139}
140
141#[allow(clippy::upper_case_acronyms)]
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct SBOMRelationship {
145    pub from: String,
146    pub to: String,
147    pub relationship_type: String,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct Ed25519Signature {
153    pub key_id: Option<String>,
154    pub public_key: String,
155    pub signature: String,
156    pub manifest_hash_blake3: String,
157    pub algorithm: String,
158}
159
160#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
161#[serde(default)]
162pub struct WorkflowBundleTrigger {
163    pub id: String,
164    pub kind: String,
165    pub provider: Option<String>,
166    pub events: Vec<String>,
167    pub schedule: Option<String>,
168    pub delay: Option<String>,
169    pub webhook_path: Option<String>,
170    pub mcp_tool: Option<String>,
171    pub resume_key: Option<String>,
172    pub node_id: Option<String>,
173    pub metadata: BTreeMap<String, String>,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct PromptCapsule {
179    pub id: String,
180    pub node_id: String,
181    pub trigger_id: Option<String>,
182    pub prompt: String,
183    pub system: Option<String>,
184    pub context: BTreeMap<String, serde_json::Value>,
185}
186
187#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
188#[serde(default)]
189pub struct WorkflowBundlePolicy {
190    pub autonomy_tier: String,
191    pub tool_policy: BTreeMap<String, serde_json::Value>,
192    pub approval_required: Vec<String>,
193    pub retry: RetryPolicySpec,
194    pub catchup: CatchupPolicySpec,
195}
196
197impl Default for WorkflowBundlePolicy {
198    fn default() -> Self {
199        Self {
200            autonomy_tier: "act_with_approval".to_string(),
201            tool_policy: BTreeMap::new(),
202            approval_required: Vec::new(),
203            retry: RetryPolicySpec::default(),
204            catchup: CatchupPolicySpec::default(),
205        }
206    }
207}
208
209#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(default)]
211pub struct RetryPolicySpec {
212    pub max_attempts: u32,
213    pub backoff: String,
214}
215
216impl Default for RetryPolicySpec {
217    fn default() -> Self {
218        Self {
219            max_attempts: 1,
220            backoff: "none".to_string(),
221        }
222    }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226#[serde(default)]
227pub struct CatchupPolicySpec {
228    pub mode: String,
229    pub max_events: Option<u32>,
230}
231
232impl Default for CatchupPolicySpec {
233    fn default() -> Self {
234        Self {
235            mode: "latest".to_string(),
236            max_events: Some(1),
237        }
238    }
239}
240
241#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
242#[serde(default)]
243pub struct ConnectorRequirement {
244    pub id: String,
245    pub provider_id: String,
246    pub scopes: Vec<String>,
247    pub setup_required: bool,
248    pub status_required: bool,
249}
250
251#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct EnvironmentRequirements {
254    pub repo_setup_profile: Option<String>,
255    pub worktree_policy: String,
256    pub command_gates: Vec<String>,
257}
258
259impl Default for EnvironmentRequirements {
260    fn default() -> Self {
261        Self {
262            repo_setup_profile: None,
263            worktree_policy: "host_managed".to_string(),
264            command_gates: Vec::new(),
265        }
266    }
267}
268
269#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
270#[serde(default)]
271pub struct WorkflowBundleReplayMetadata {
272    pub run_id: Option<String>,
273    pub event_ids: Vec<String>,
274    pub workflow_version: Option<usize>,
275    pub graph_digest: Option<String>,
276}
277
278#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
279#[serde(default)]
280pub struct WorkflowBundleDiagnostic {
281    pub severity: String,
282    pub path: String,
283    pub message: String,
284    pub node_id: Option<String>,
285}
286
287#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
288#[serde(default)]
289pub struct WorkflowBundleValidationReport {
290    pub valid: bool,
291    pub bundle_id: String,
292    pub workflow_id: String,
293    pub graph_digest: String,
294    pub errors: Vec<WorkflowBundleDiagnostic>,
295    pub warnings: Vec<WorkflowBundleDiagnostic>,
296}
297
298#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
299pub struct WorkflowBundlePreview {
300    pub schema_version: u32,
301    pub bundle_id: String,
302    pub bundle_version: String,
303    pub workflow_id: String,
304    pub workflow_version: usize,
305    pub graph_digest: String,
306    pub validation: WorkflowBundleValidationReport,
307    pub graph: WorkflowBundleGraphExport,
308    pub mermaid: String,
309    pub triggers: Vec<WorkflowBundleTrigger>,
310    pub connectors: Vec<ConnectorRequirement>,
311    pub environment: EnvironmentRequirements,
312    pub nodes: Vec<WorkflowBundlePreviewNode>,
313    pub edges: Vec<WorkflowEdge>,
314    /// Pass-through of the signed bundle's `metadata` map. Hosts read the
315    /// `contributes` and `extension` keys (populated by `harn pack`) to
316    /// discover host-surface extension contributions from the verified
317    /// preview without unpacking the archive.
318    #[serde(default)]
319    pub metadata: BTreeMap<String, serde_json::Value>,
320}
321
322#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
323pub struct WorkflowBundlePreviewNode {
324    pub id: String,
325    pub kind: String,
326    pub label: Option<String>,
327    pub prompt_capsule: Option<String>,
328    pub trigger_ids: Vec<String>,
329    pub outgoing: Vec<String>,
330}
331
332#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
333pub struct WorkflowBundleGraphExport {
334    pub schema_version: u32,
335    pub graph_id: String,
336    pub graph_digest: String,
337    pub nodes: Vec<WorkflowBundleGraphNode>,
338    pub edges: Vec<WorkflowBundleGraphEdge>,
339    pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
340    pub editable_fields: Vec<WorkflowBundleEditableField>,
341    pub mermaid: String,
342}
343
344#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
345pub struct WorkflowBundleGraphNode {
346    pub id: String,
347    pub node_type: String,
348    pub label: String,
349    pub workflow_node_id: Option<String>,
350    pub trigger_id: Option<String>,
351    pub connector_id: Option<String>,
352    pub editable_fields: Vec<WorkflowBundleEditableField>,
353    pub metadata: BTreeMap<String, serde_json::Value>,
354}
355
356#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
357pub struct WorkflowBundleGraphEdge {
358    pub from: String,
359    pub to: String,
360    pub label: Option<String>,
361    pub branch: Option<String>,
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
365pub struct WorkflowBundleGraphDiagnostic {
366    pub severity: String,
367    pub path: String,
368    pub message: String,
369    pub node_id: Option<String>,
370    pub graph_node_id: Option<String>,
371}
372
373#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
374pub struct WorkflowBundleEditableField {
375    pub id: String,
376    pub label: String,
377    pub json_pointer: String,
378    pub value_type: String,
379    pub required: bool,
380    pub enum_values: Vec<String>,
381}
382
383#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
384#[serde(default)]
385pub struct WorkflowBundleRunRequest {
386    pub trigger_id: Option<String>,
387    pub event_id: Option<String>,
388}
389
390#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
391pub struct WorkflowBundleRunReceipt {
392    pub schema_version: u32,
393    pub receipt_type: String,
394    pub bundle_id: String,
395    pub bundle_version: String,
396    pub workflow_id: String,
397    pub workflow_version: usize,
398    pub graph_digest: String,
399    pub run_id: String,
400    pub trigger_id: Option<String>,
401    pub event_ids: Vec<String>,
402    pub status: String,
403    pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
404    pub policy: WorkflowBundlePolicy,
405    pub connectors: Vec<ConnectorRequirement>,
406    pub environment: EnvironmentRequirements,
407}
408
409#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
410pub struct WorkflowBundleRunNodeReceipt {
411    pub node_id: String,
412    pub kind: String,
413    pub prompt_capsule: Option<String>,
414    pub status: String,
415}
416
417#[derive(Clone, Debug, PartialEq, Eq)]
418pub struct HarnpackEntry {
419    pub path: PathBuf,
420    pub bytes: Vec<u8>,
421    pub mode: u32,
422}
423
424impl HarnpackEntry {
425    pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
426        Self {
427            path: path.into(),
428            bytes: bytes.into(),
429            mode: DEFAULT_HARNPACK_FILE_MODE,
430        }
431    }
432
433    pub fn with_mode(mut self, mode: u32) -> Self {
434        self.mode = mode;
435        self
436    }
437}
438
439#[derive(Clone, Debug, PartialEq)]
440pub struct HarnpackArchive {
441    pub manifest: WorkflowBundle,
442    pub contents: Vec<HarnpackEntry>,
443}
444
445#[derive(Clone, Debug, PartialEq, Eq)]
446pub enum WorkflowBundleErrorKind {
447    Io,
448    Json,
449    MissingSchemaVersion,
450    UnsupportedSchemaVersion { actual: u32, expected: u32 },
451    InvalidArchive,
452    DuplicateArchiveEntry,
453    UnsafeArchivePath,
454    InvalidSignature,
455}
456
457#[derive(Clone, Debug, PartialEq, Eq)]
458pub struct WorkflowBundleError {
459    pub kind: WorkflowBundleErrorKind,
460    pub message: String,
461}
462
463impl WorkflowBundleError {
464    fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
465        Self {
466            kind,
467            message: message.into(),
468        }
469    }
470
471    fn unsupported_schema_version(actual: u32) -> Self {
472        Self::new(
473            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
474                actual,
475                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
476            },
477            format!(
478                "unsupported workflow bundle schema_version {actual}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
479            ),
480        )
481    }
482}
483
484impl std::fmt::Display for WorkflowBundleError {
485    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486        self.message.fmt(f)
487    }
488}
489
490impl std::error::Error for WorkflowBundleError {}
491
492impl From<std::io::Error> for WorkflowBundleError {
493    fn from(error: std::io::Error) -> Self {
494        Self::new(WorkflowBundleErrorKind::Io, error.to_string())
495    }
496}
497
498impl From<serde_json::Error> for WorkflowBundleError {
499    fn from(error: serde_json::Error) -> Self {
500        Self::new(WorkflowBundleErrorKind::Json, error.to_string())
501    }
502}
503
504pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
505    let bytes = fs::read(path)?;
506    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
507        read_harnpack(&bytes).map(|archive| archive.manifest)
508    } else {
509        parse_workflow_bundle_manifest(&bytes)
510    }
511}
512
513/// Read a manifest from any prior or current schema version without
514/// rejecting on a `schema_version` mismatch. Used by `harn pack
515/// --upgrade` to read v1 bundles before re-emitting them under the v2
516/// shape. Fields the older schema didn't carry deserialize to their
517/// type defaults via `#[serde(default)]`.
518pub fn read_workflow_bundle_manifest_any_version(
519    bytes: &[u8],
520) -> Result<WorkflowBundle, WorkflowBundleError> {
521    let value: serde_json::Value = serde_json::from_slice(bytes)?;
522    if value.get("schema_version").is_none() {
523        return Err(WorkflowBundleError::new(
524            WorkflowBundleErrorKind::MissingSchemaVersion,
525            "workflow bundle manifest is missing schema_version",
526        ));
527    }
528    serde_json::from_value(value).map_err(Into::into)
529}
530
531/// Variant of [`load_workflow_bundle`] that accepts any historical
532/// schema version. Reads the manifest from a `.harnpack` archive or a
533/// bare JSON manifest as appropriate.
534pub fn load_workflow_bundle_any_version(
535    path: &Path,
536) -> Result<WorkflowBundle, WorkflowBundleError> {
537    let bytes = fs::read(path)?;
538    if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
539        let tar_bytes = decompress_harnpack_zstd(&bytes)?;
540        let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
541        for entry in archive.entries()? {
542            let mut entry = entry?;
543            if entry.header().entry_type().is_dir() {
544                continue;
545            }
546            let path = normalize_archive_path(entry.path()?.as_ref())?;
547            if path == HARNPACK_MANIFEST_PATH {
548                let mut entry_bytes = Vec::new();
549                entry.read_to_end(&mut entry_bytes)?;
550                return read_workflow_bundle_manifest_any_version(&entry_bytes);
551            }
552        }
553        Err(WorkflowBundleError::new(
554            WorkflowBundleErrorKind::InvalidArchive,
555            format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
556        ))
557    } else {
558        read_workflow_bundle_manifest_any_version(&bytes)
559    }
560}
561
562pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
563    let value: serde_json::Value = serde_json::from_slice(bytes)?;
564    let schema_version = value
565        .get("schema_version")
566        .and_then(serde_json::Value::as_u64)
567        .ok_or_else(|| {
568            WorkflowBundleError::new(
569                WorkflowBundleErrorKind::MissingSchemaVersion,
570                "workflow bundle manifest is missing numeric schema_version",
571            )
572        })?;
573    let actual = u32::try_from(schema_version).map_err(|_| {
574        WorkflowBundleError::new(
575            WorkflowBundleErrorKind::UnsupportedSchemaVersion {
576                actual: u32::MAX,
577                expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
578            },
579            format!(
580                "unsupported workflow bundle schema_version {schema_version}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
581            ),
582        )
583    })?;
584    if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
585        return Err(WorkflowBundleError::unsupported_schema_version(actual));
586    }
587    serde_json::from_value(value).map_err(Into::into)
588}
589
590pub fn canonical_workflow_bundle_manifest_bytes(
591    bundle: &WorkflowBundle,
592) -> Result<Vec<u8>, WorkflowBundleError> {
593    serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
594}
595
596pub fn workflow_bundle_hash(
597    bundle: &WorkflowBundle,
598    contents: &[HarnpackEntry],
599) -> Result<String, WorkflowBundleError> {
600    let mut hasher = blake3::Hasher::new();
601    let mut canonical = canonical_workflow_bundle_manifest(bundle);
602    canonical.signature = None;
603    let manifest_bytes = serde_json::to_vec(&canonical)?;
604    hasher.update(&manifest_bytes);
605
606    let mut content_hashes = contents
607        .iter()
608        .map(|entry| blake3_hash_bytes(&entry.bytes))
609        .collect::<Vec<_>>();
610    content_hashes.sort();
611    for content_hash in content_hashes {
612        hasher.update(b"\n");
613        hasher.update(content_hash.as_bytes());
614    }
615
616    Ok(blake3_digest_string(hasher.finalize()))
617}
618
619pub fn verify_workflow_bundle_signature(
620    bundle: &WorkflowBundle,
621    contents: &[HarnpackEntry],
622) -> Result<(), WorkflowBundleError> {
623    let signature = bundle.signature.as_ref().ok_or_else(|| {
624        WorkflowBundleError::new(
625            WorkflowBundleErrorKind::InvalidSignature,
626            "workflow bundle is unsigned",
627        )
628    })?;
629    if signature.algorithm != "ed25519" {
630        return Err(WorkflowBundleError::new(
631            WorkflowBundleErrorKind::InvalidSignature,
632            format!(
633                "unsupported workflow bundle signature algorithm {}",
634                signature.algorithm
635            ),
636        ));
637    }
638    let expected_hash = workflow_bundle_hash(bundle, contents)?;
639    if signature.manifest_hash_blake3 != expected_hash {
640        return Err(WorkflowBundleError::new(
641            WorkflowBundleErrorKind::InvalidSignature,
642            format!(
643                "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
644                signature.manifest_hash_blake3
645            ),
646        ));
647    }
648    let public_key_bytes = decode_hex_exact::<32>(
649        "workflow bundle signature public_key",
650        &signature.public_key,
651    )?;
652    let signature_bytes =
653        decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
654    let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
655        WorkflowBundleError::new(
656            WorkflowBundleErrorKind::InvalidSignature,
657            format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
658        )
659    })?;
660    let ed25519_signature = Signature::from_bytes(&signature_bytes);
661    verifying_key
662        .verify(expected_hash.as_bytes(), &ed25519_signature)
663        .map_err(|error| {
664            WorkflowBundleError::new(
665                WorkflowBundleErrorKind::InvalidSignature,
666                format!("workflow bundle signature failed Ed25519 verification: {error}"),
667            )
668        })
669}
670
671pub fn build_harnpack(
672    bundle: &WorkflowBundle,
673    contents: &[HarnpackEntry],
674) -> Result<Vec<u8>, WorkflowBundleError> {
675    let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
676    let mut entries = contents
677        .iter()
678        .map(|entry| {
679            normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
680        })
681        .collect::<Result<Vec<_>, _>>()?;
682    entries.sort_by(|left, right| left.0.cmp(&right.0));
683
684    let mut seen = BTreeSet::new();
685    for (path, _, _) in &entries {
686        if path == HARNPACK_MANIFEST_PATH {
687            return Err(WorkflowBundleError::new(
688                WorkflowBundleErrorKind::DuplicateArchiveEntry,
689                format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
690            ));
691        }
692        if !seen.insert(path.clone()) {
693            return Err(WorkflowBundleError::new(
694                WorkflowBundleErrorKind::DuplicateArchiveEntry,
695                format!("duplicate archive entry: {path}"),
696            ));
697        }
698    }
699
700    let mut tar_bytes = Vec::new();
701    {
702        let mut builder = tar::Builder::new(&mut tar_bytes);
703        append_harnpack_entry(
704            &mut builder,
705            HARNPACK_MANIFEST_PATH,
706            &manifest_bytes,
707            DEFAULT_HARNPACK_FILE_MODE,
708        )?;
709        for (path, bytes, mode) in entries {
710            append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
711        }
712        builder.finish()?;
713    }
714
715    zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
716}
717
718pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
719    let tar_bytes = decompress_harnpack_zstd(bytes)?;
720    let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
721    let mut manifest = None;
722    let mut contents = Vec::new();
723    let mut seen = BTreeSet::new();
724
725    for entry in archive.entries()? {
726        let mut entry = entry?;
727        if entry.header().entry_type().is_dir() {
728            continue;
729        }
730        let path = normalize_archive_path(entry.path()?.as_ref())?;
731        if !seen.insert(path.clone()) {
732            return Err(WorkflowBundleError::new(
733                WorkflowBundleErrorKind::DuplicateArchiveEntry,
734                format!("duplicate archive entry: {path}"),
735            ));
736        }
737        let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
738        let mut entry_bytes = Vec::new();
739        entry.read_to_end(&mut entry_bytes)?;
740
741        if path == HARNPACK_MANIFEST_PATH {
742            manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
743        } else {
744            contents.push(HarnpackEntry {
745                path: PathBuf::from(path),
746                bytes: entry_bytes,
747                mode,
748            });
749        }
750    }
751
752    contents.sort_by(|left, right| left.path.cmp(&right.path));
753    Ok(HarnpackArchive {
754        manifest: manifest.ok_or_else(|| {
755            WorkflowBundleError::new(
756                WorkflowBundleErrorKind::InvalidArchive,
757                format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
758            )
759        })?,
760        contents,
761    })
762}
763
764pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
765    let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
766    Ok(blake3_hash_bytes(&bytes))
767}
768
769pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
770    let mut canonical = canonical_workflow_graph(graph);
771    canonical.audit_log.clear();
772    let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
773    let digest = Sha256::digest(bytes);
774    let hex = digest
775        .iter()
776        .map(|byte| format!("{byte:02x}"))
777        .collect::<String>();
778    format!("sha256:{hex}")
779}
780
781fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
782    let mut canonical = bundle.clone();
783    canonical.workflow = canonical_workflow_graph(&bundle.workflow);
784    canonical.transitive_modules.sort_by(|left, right| {
785        (
786            path_sort_key(&left.path),
787            &left.source_hash_blake3,
788            &left.harnbc_hash_blake3,
789        )
790            .cmp(&(
791                path_sort_key(&right.path),
792                &right.source_hash_blake3,
793                &right.harnbc_hash_blake3,
794            ))
795    });
796    canonical.tool_manifest.sort_by(|left, right| {
797        (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
798            &right.name,
799            &right.provider,
800            &right.schema_hash_blake3,
801        ))
802    });
803    canonical.sbom.packages.sort_by(|left, right| {
804        (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
805            &right.name,
806            &right.version,
807            &right.package_hash_blake3,
808        ))
809    });
810    canonical.sbom.relationships.sort_by(|left, right| {
811        (&left.from, &left.to, &left.relationship_type).cmp(&(
812            &right.from,
813            &right.to,
814            &right.relationship_type,
815        ))
816    });
817    canonical
818}
819
820fn decode_hex_exact<const N: usize>(
821    label: &str,
822    value: &str,
823) -> Result<[u8; N], WorkflowBundleError> {
824    let bytes = hex::decode(value).map_err(|error| {
825        WorkflowBundleError::new(
826            WorkflowBundleErrorKind::InvalidSignature,
827            format!("{label} is not hex: {error}"),
828        )
829    })?;
830    bytes.try_into().map_err(|bytes: Vec<u8>| {
831        WorkflowBundleError::new(
832            WorkflowBundleErrorKind::InvalidSignature,
833            format!("{label} must decode to {N} bytes, got {}", bytes.len()),
834        )
835    })
836}
837
838fn append_harnpack_entry<W: std::io::Write>(
839    builder: &mut tar::Builder<W>,
840    path: &str,
841    bytes: &[u8],
842    mode: u32,
843) -> Result<(), WorkflowBundleError> {
844    let mut header = tar::Header::new_gnu();
845    header.set_path(path).map_err(|error| {
846        WorkflowBundleError::new(
847            WorkflowBundleErrorKind::UnsafeArchivePath,
848            format!("invalid archive path {path}: {error}"),
849        )
850    })?;
851    header.set_entry_type(tar::EntryType::Regular);
852    header.set_size(bytes.len() as u64);
853    header.set_mode(mode);
854    header.set_mtime(0);
855    header.set_uid(0);
856    header.set_gid(0);
857    header.set_cksum();
858    builder.append(&header, bytes)?;
859    Ok(())
860}
861
862fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
863    let mut parts = Vec::new();
864    for component in path.components() {
865        match component {
866            Component::Normal(part) => {
867                let Some(part) = part.to_str() else {
868                    return Err(WorkflowBundleError::new(
869                        WorkflowBundleErrorKind::UnsafeArchivePath,
870                        format!("archive path is not valid UTF-8: {}", path.display()),
871                    ));
872                };
873                if part.is_empty() {
874                    return Err(WorkflowBundleError::new(
875                        WorkflowBundleErrorKind::UnsafeArchivePath,
876                        "archive path contains an empty component",
877                    ));
878                }
879                parts.push(part.to_string());
880            }
881            Component::CurDir => {}
882            Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
883                return Err(WorkflowBundleError::new(
884                    WorkflowBundleErrorKind::UnsafeArchivePath,
885                    format!(
886                        "archive path must be relative and contained: {}",
887                        path.display()
888                    ),
889                ));
890            }
891        }
892    }
893    if parts.is_empty() {
894        return Err(WorkflowBundleError::new(
895            WorkflowBundleErrorKind::UnsafeArchivePath,
896            "archive path is empty",
897        ));
898    }
899    Ok(parts.join("/"))
900}
901
902fn path_sort_key(path: &Path) -> String {
903    path.components()
904        .filter_map(|component| match component {
905            Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
906            _ => None,
907        })
908        .collect::<Vec<_>>()
909        .join("/")
910}
911
912fn blake3_hash_bytes(bytes: &[u8]) -> String {
913    blake3_digest_string(blake3::hash(bytes))
914}
915
916fn blake3_digest_string(hash: blake3::Hash) -> String {
917    format!("blake3:{hash}")
918}
919
920pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
921    let canonical = canonical_workflow_graph(&bundle.workflow);
922    let mut report = WorkflowBundleValidationReport {
923        valid: true,
924        bundle_id: bundle.id.clone(),
925        workflow_id: canonical.id.clone(),
926        graph_digest: workflow_graph_digest(&canonical),
927        errors: Vec::new(),
928        warnings: Vec::new(),
929    };
930
931    validate_manifest_contract(bundle, &mut report);
932    validate_bundle_identity(bundle, &canonical, &mut report);
933    validate_triggers(bundle, &canonical, &mut report);
934    validate_prompt_capsules(bundle, &canonical, &mut report);
935    validate_policy(bundle, &mut report);
936    validate_connectors(bundle, &mut report);
937    validate_environment(bundle, &mut report);
938
939    let graph_report = validate_workflow(&canonical, None);
940    for error in graph_report.errors {
941        let node_id = workflow_diagnostic_node_id(&error, &canonical);
942        push_error(&mut report, "workflow", error, node_id);
943    }
944    for warning in graph_report.warnings {
945        let node_id = workflow_diagnostic_node_id(&warning, &canonical);
946        push_warning(&mut report, "workflow", warning, node_id);
947    }
948
949    if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
950        if expected != report.graph_digest {
951            let actual = report.graph_digest.clone();
952            push_error(
953                &mut report,
954                "receipts.graph_digest",
955                format!("graph digest mismatch: expected {expected}, computed {actual}"),
956                None,
957            );
958        }
959    }
960    if let Some(expected_version) = bundle.receipts.workflow_version {
961        if expected_version != canonical.version {
962            push_error(
963                &mut report,
964                "receipts.workflow_version",
965                format!(
966                    "workflow version mismatch: expected {expected_version}, computed {}",
967                    canonical.version
968                ),
969                None,
970            );
971        }
972    }
973
974    report.valid = report.errors.is_empty();
975    report
976}
977
978pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
979    let canonical = canonical_workflow_graph(&bundle.workflow);
980    let validation = validate_workflow_bundle(bundle);
981    let graph = export_workflow_bundle_graph(bundle, &validation);
982    let mermaid = graph.mermaid.clone();
983    let triggers_by_node = triggers_by_node(bundle);
984    let capsules_by_node = capsules_by_node(bundle);
985    let mut nodes = Vec::new();
986
987    for (node_id, node) in &canonical.nodes {
988        let mut outgoing = canonical
989            .edges
990            .iter()
991            .filter(|edge| edge.from == *node_id)
992            .map(|edge| edge.to.clone())
993            .collect::<Vec<_>>();
994        outgoing.sort();
995        outgoing.dedup();
996        nodes.push(WorkflowBundlePreviewNode {
997            id: node_id.clone(),
998            kind: node.kind.clone(),
999            label: node.task_label.clone(),
1000            prompt_capsule: capsules_by_node.get(node_id).cloned(),
1001            trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
1002            outgoing,
1003        });
1004    }
1005
1006    WorkflowBundlePreview {
1007        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1008        bundle_id: bundle.id.clone(),
1009        bundle_version: bundle.version.clone(),
1010        workflow_id: canonical.id.clone(),
1011        workflow_version: canonical.version,
1012        graph_digest: validation.graph_digest.clone(),
1013        validation,
1014        graph,
1015        mermaid,
1016        triggers: bundle.triggers.clone(),
1017        connectors: bundle.connectors.clone(),
1018        environment: bundle.environment.clone(),
1019        nodes,
1020        edges: sorted_edges(&canonical),
1021        metadata: bundle.metadata.clone(),
1022    }
1023}
1024
1025pub fn export_workflow_bundle_graph(
1026    bundle: &WorkflowBundle,
1027    validation: &WorkflowBundleValidationReport,
1028) -> WorkflowBundleGraphExport {
1029    let canonical = canonical_workflow_graph(&bundle.workflow);
1030    let mut nodes = Vec::new();
1031    let mut edges = Vec::new();
1032    let mut editable_fields = Vec::new();
1033    let capsules_by_node = capsules_by_node(bundle);
1034    let catchup_enabled = bundle.policy.catchup.mode != "none";
1035    let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1036
1037    for (index, connector) in bundle.connectors.iter().enumerate() {
1038        let node_fields = connector_editable_fields(index, connector);
1039        editable_fields.extend(node_fields.clone());
1040        nodes.push(WorkflowBundleGraphNode {
1041            id: connector_graph_id(&connector.id),
1042            node_type: "connector_call".to_string(),
1043            label: connector_label(connector),
1044            workflow_node_id: None,
1045            trigger_id: None,
1046            connector_id: Some(connector.id.clone()),
1047            editable_fields: node_fields,
1048            metadata: BTreeMap::from([
1049                (
1050                    "provider_id".to_string(),
1051                    serde_json::json!(connector.provider_id),
1052                ),
1053                ("scopes".to_string(), serde_json::json!(connector.scopes)),
1054            ]),
1055        });
1056    }
1057
1058    let catchup_fields = catchup_editable_fields();
1059    let retry_fields = retry_editable_fields();
1060    if catchup_enabled {
1061        let node_fields = catchup_fields;
1062        editable_fields.extend(node_fields.clone());
1063        nodes.push(WorkflowBundleGraphNode {
1064            id: catchup_graph_id(),
1065            node_type: "catchup".to_string(),
1066            label: "Catch up".to_string(),
1067            workflow_node_id: None,
1068            trigger_id: None,
1069            connector_id: None,
1070            editable_fields: node_fields,
1071            metadata: BTreeMap::from([(
1072                "mode".to_string(),
1073                serde_json::json!(bundle.policy.catchup.mode),
1074            )]),
1075        });
1076    } else {
1077        editable_fields.extend(catchup_fields);
1078    }
1079    if retry_can_dlq {
1080        let node_fields = retry_fields;
1081        editable_fields.extend(node_fields.clone());
1082        nodes.push(WorkflowBundleGraphNode {
1083            id: dlq_graph_id(),
1084            node_type: "dlq".to_string(),
1085            label: "Dead letter queue".to_string(),
1086            workflow_node_id: None,
1087            trigger_id: None,
1088            connector_id: None,
1089            editable_fields: node_fields,
1090            metadata: BTreeMap::from([(
1091                "max_attempts".to_string(),
1092                serde_json::json!(bundle.policy.retry.max_attempts),
1093            )]),
1094        });
1095    } else {
1096        editable_fields.extend(retry_fields);
1097    }
1098
1099    for (index, trigger) in bundle.triggers.iter().enumerate() {
1100        let node_fields = trigger_editable_fields(index, trigger);
1101        editable_fields.extend(node_fields.clone());
1102        nodes.push(WorkflowBundleGraphNode {
1103            id: trigger_graph_id(&trigger.id),
1104            node_type: "trigger".to_string(),
1105            label: trigger_label(trigger),
1106            workflow_node_id: trigger.node_id.clone(),
1107            trigger_id: Some(trigger.id.clone()),
1108            connector_id: None,
1109            editable_fields: node_fields,
1110            metadata: BTreeMap::from([
1111                ("kind".to_string(), serde_json::json!(trigger.kind)),
1112                ("provider".to_string(), serde_json::json!(trigger.provider)),
1113                ("events".to_string(), serde_json::json!(trigger.events)),
1114            ]),
1115        });
1116        if let Some(provider) = trigger.provider.as_deref() {
1117            if let Some(connector) = bundle
1118                .connectors
1119                .iter()
1120                .find(|connector| connector.provider_id == provider || connector.id == provider)
1121            {
1122                edges.push(WorkflowBundleGraphEdge {
1123                    from: connector_graph_id(&connector.id),
1124                    to: trigger_graph_id(&trigger.id),
1125                    label: Some("binds".to_string()),
1126                    branch: None,
1127                });
1128            }
1129        }
1130        let target = trigger
1131            .node_id
1132            .clone()
1133            .unwrap_or_else(|| canonical.entry.clone());
1134        if catchup_enabled {
1135            edges.push(WorkflowBundleGraphEdge {
1136                from: trigger_graph_id(&trigger.id),
1137                to: catchup_graph_id(),
1138                label: Some(bundle.policy.catchup.mode.clone()),
1139                branch: Some("catchup".to_string()),
1140            });
1141            edges.push(WorkflowBundleGraphEdge {
1142                from: catchup_graph_id(),
1143                to: workflow_graph_id(&target),
1144                label: Some("dispatch".to_string()),
1145                branch: None,
1146            });
1147        } else {
1148            edges.push(WorkflowBundleGraphEdge {
1149                from: trigger_graph_id(&trigger.id),
1150                to: workflow_graph_id(&target),
1151                label: Some("dispatch".to_string()),
1152                branch: None,
1153            });
1154        }
1155    }
1156
1157    for (node_id, node) in &canonical.nodes {
1158        let capsule_id = capsules_by_node.get(node_id);
1159        let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1160        editable_fields.extend(node_fields.clone());
1161        nodes.push(WorkflowBundleGraphNode {
1162            id: workflow_graph_id(node_id),
1163            node_type: workflow_node_type(&node.kind),
1164            label: workflow_node_label(node_id, node),
1165            workflow_node_id: Some(node_id.clone()),
1166            trigger_id: None,
1167            connector_id: None,
1168            editable_fields: node_fields,
1169            metadata: BTreeMap::from([
1170                ("kind".to_string(), serde_json::json!(node.kind)),
1171                ("task_label".to_string(), serde_json::json!(node.task_label)),
1172                (
1173                    "prompt_capsule".to_string(),
1174                    serde_json::json!(capsule_id.cloned()),
1175                ),
1176            ]),
1177        });
1178    }
1179
1180    for edge in sorted_edges(&canonical) {
1181        edges.push(WorkflowBundleGraphEdge {
1182            from: workflow_graph_id(&edge.from),
1183            to: workflow_graph_id(&edge.to),
1184            label: edge.label.clone(),
1185            branch: edge.branch.clone(),
1186        });
1187    }
1188
1189    let outgoing: BTreeSet<&str> = canonical
1190        .edges
1191        .iter()
1192        .map(|edge| edge.from.as_str())
1193        .collect();
1194    for node_id in canonical.nodes.keys() {
1195        if !outgoing.contains(node_id.as_str()) {
1196            edges.push(WorkflowBundleGraphEdge {
1197                from: workflow_graph_id(node_id),
1198                to: terminal_completed_graph_id(),
1199                label: Some("completed".to_string()),
1200                branch: Some("completed".to_string()),
1201            });
1202        }
1203        if retry_can_dlq {
1204            edges.push(WorkflowBundleGraphEdge {
1205                from: workflow_graph_id(node_id),
1206                to: dlq_graph_id(),
1207                label: Some("retry exhausted".to_string()),
1208                branch: Some("failed".to_string()),
1209            });
1210        }
1211    }
1212
1213    nodes.push(WorkflowBundleGraphNode {
1214        id: terminal_completed_graph_id(),
1215        node_type: "terminal".to_string(),
1216        label: "Completed".to_string(),
1217        workflow_node_id: None,
1218        trigger_id: None,
1219        connector_id: None,
1220        editable_fields: Vec::new(),
1221        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1222    });
1223    nodes.push(WorkflowBundleGraphNode {
1224        id: terminal_failed_graph_id(),
1225        node_type: "terminal".to_string(),
1226        label: "Failed".to_string(),
1227        workflow_node_id: None,
1228        trigger_id: None,
1229        connector_id: None,
1230        editable_fields: Vec::new(),
1231        metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1232    });
1233    if retry_can_dlq {
1234        edges.push(WorkflowBundleGraphEdge {
1235            from: dlq_graph_id(),
1236            to: terminal_failed_graph_id(),
1237            label: Some("failed".to_string()),
1238            branch: Some("failed".to_string()),
1239        });
1240    }
1241
1242    nodes.sort_by(|left, right| left.id.cmp(&right.id));
1243    edges.sort_by(|left, right| {
1244        (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1245            &right.from,
1246            &right.to,
1247            &right.branch,
1248            &right.label,
1249        ))
1250    });
1251    editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1252
1253    let diagnostics = validation
1254        .errors
1255        .iter()
1256        .chain(validation.warnings.iter())
1257        .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1258            severity: diagnostic.severity.clone(),
1259            path: diagnostic.path.clone(),
1260            message: diagnostic.message.clone(),
1261            node_id: diagnostic.node_id.clone(),
1262            graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1263        })
1264        .collect::<Vec<_>>();
1265    let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1266
1267    WorkflowBundleGraphExport {
1268        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1269        graph_id: canonical.id,
1270        graph_digest: validation.graph_digest.clone(),
1271        nodes,
1272        edges,
1273        diagnostics,
1274        editable_fields,
1275        mermaid,
1276    }
1277}
1278
1279pub fn run_workflow_bundle(
1280    bundle: &WorkflowBundle,
1281    request: WorkflowBundleRunRequest,
1282) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1283    let validation = validate_workflow_bundle(bundle);
1284    if !validation.valid {
1285        return Err(validation);
1286    }
1287
1288    let canonical = canonical_workflow_graph(&bundle.workflow);
1289    let trigger_id = match request.trigger_id {
1290        Some(trigger_id)
1291            if !bundle
1292                .triggers
1293                .iter()
1294                .any(|trigger| trigger.id == trigger_id) =>
1295        {
1296            let mut report = validation;
1297            push_error(
1298                &mut report,
1299                "trigger_id",
1300                format!("unknown trigger id: {trigger_id}"),
1301                None,
1302            );
1303            report.valid = false;
1304            return Err(report);
1305        }
1306        Some(trigger_id) => Some(trigger_id),
1307        None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1308    };
1309    let mut event_ids = bundle.receipts.event_ids.clone();
1310    if let Some(event_id) = request.event_id {
1311        if !event_ids.contains(&event_id) {
1312            event_ids.push(event_id);
1313        }
1314    }
1315    let run_id = bundle
1316        .receipts
1317        .run_id
1318        .clone()
1319        .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1320    let capsules_by_node = capsules_by_node(bundle);
1321    let executed_nodes = execution_order(&canonical)
1322        .into_iter()
1323        .map(|node_id| {
1324            let node = canonical
1325                .nodes
1326                .get(&node_id)
1327                .expect("execution order only contains known nodes");
1328            WorkflowBundleRunNodeReceipt {
1329                node_id: node_id.clone(),
1330                kind: node.kind.clone(),
1331                prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1332                status: "completed".to_string(),
1333            }
1334        })
1335        .collect();
1336
1337    Ok(WorkflowBundleRunReceipt {
1338        schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1339        receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1340        bundle_id: bundle.id.clone(),
1341        bundle_version: bundle.version.clone(),
1342        workflow_id: canonical.id,
1343        workflow_version: canonical.version,
1344        graph_digest: validation.graph_digest,
1345        run_id,
1346        trigger_id,
1347        event_ids,
1348        status: "completed".to_string(),
1349        executed_nodes,
1350        policy: bundle.policy.clone(),
1351        connectors: bundle.connectors.clone(),
1352        environment: bundle.environment.clone(),
1353    })
1354}
1355
1356fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1357    let mut canonical = graph.clone();
1358    if canonical.type_name.is_empty() {
1359        canonical.type_name = "workflow_graph".to_string();
1360    }
1361    if canonical.version == 0 {
1362        canonical.version = 1;
1363    }
1364    if canonical.entry.is_empty() {
1365        canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1366    }
1367    for (node_id, node) in &mut canonical.nodes {
1368        if node.id.is_none() {
1369            node.id = Some(node_id.clone());
1370        }
1371        if node.kind.is_empty() {
1372            node.kind = "stage".to_string();
1373        }
1374        if node.retry_policy.max_attempts == 0 {
1375            node.retry_policy.max_attempts = 1;
1376        }
1377    }
1378    canonical.edges = sorted_edges(&canonical);
1379    canonical
1380}
1381
1382fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1383    let mut edges = graph.edges.clone();
1384    edges.sort_by(|left, right| {
1385        (
1386            &left.from,
1387            &left.to,
1388            left.branch.as_deref(),
1389            left.label.as_deref(),
1390        )
1391            .cmp(&(
1392                &right.from,
1393                &right.to,
1394                right.branch.as_deref(),
1395                right.label.as_deref(),
1396            ))
1397    });
1398    edges
1399}
1400
1401fn validate_bundle_identity(
1402    bundle: &WorkflowBundle,
1403    graph: &WorkflowGraph,
1404    report: &mut WorkflowBundleValidationReport,
1405) {
1406    if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1407        push_error(
1408            report,
1409            "schema_version",
1410            format!(
1411                "unsupported schema_version {}; expected {}",
1412                bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1413            ),
1414            None,
1415        );
1416    }
1417    if bundle.id.trim().is_empty() {
1418        push_error(report, "id", "bundle id is required", None);
1419    }
1420    if bundle.version.trim().is_empty() {
1421        push_error(report, "version", "bundle version is required", None);
1422    }
1423    if graph.id.trim().is_empty() {
1424        push_error(
1425            report,
1426            "workflow.id",
1427            "workflow id is required for portable bundles",
1428            None,
1429        );
1430    }
1431    if graph.nodes.is_empty() {
1432        push_error(
1433            report,
1434            "workflow.nodes",
1435            "workflow must contain nodes",
1436            None,
1437        );
1438    }
1439    for (node_id, node) in &graph.nodes {
1440        if node_id.trim().is_empty() {
1441            push_error(report, "workflow.nodes", "node id is required", None);
1442        }
1443        if node.id.as_deref().is_some_and(|id| id != node_id) {
1444            push_error(
1445                report,
1446                format!("workflow.nodes.{node_id}.id"),
1447                "node id field must match its map key",
1448                Some(node_id.clone()),
1449            );
1450        }
1451    }
1452}
1453
1454fn validate_manifest_contract(
1455    bundle: &WorkflowBundle,
1456    report: &mut WorkflowBundleValidationReport,
1457) {
1458    validate_relative_path(
1459        report,
1460        "entrypoint",
1461        &bundle.entrypoint,
1462        "entrypoint is required",
1463    );
1464    if bundle.transitive_modules.is_empty() {
1465        push_error(
1466            report,
1467            "transitive_modules",
1468            "at least one transitive module entry is required",
1469            None,
1470        );
1471    }
1472    let mut module_paths = BTreeSet::new();
1473    for (index, module) in bundle.transitive_modules.iter().enumerate() {
1474        let path = format!("transitive_modules[{index}]");
1475        validate_relative_path(
1476            report,
1477            format!("{path}.path"),
1478            &module.path,
1479            "module path is required",
1480        );
1481        if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1482        {
1483            push_error(
1484                report,
1485                format!("{path}.path"),
1486                format!(
1487                    "duplicate transitive module path: {}",
1488                    module.path.display()
1489                ),
1490                None,
1491            );
1492        }
1493        validate_blake3_hash(
1494            report,
1495            format!("{path}.source_hash_blake3"),
1496            &module.source_hash_blake3,
1497            true,
1498        );
1499        validate_blake3_hash(
1500            report,
1501            format!("{path}.harnbc_hash_blake3"),
1502            &module.harnbc_hash_blake3,
1503            true,
1504        );
1505    }
1506    if bundle.stdlib_version.trim().is_empty() {
1507        push_error(report, "stdlib_version", "stdlib_version is required", None);
1508    }
1509    if bundle.harn_version.trim().is_empty() {
1510        push_error(report, "harn_version", "harn_version is required", None);
1511    }
1512    validate_blake3_hash(
1513        report,
1514        "provider_catalog_hash",
1515        &bundle.provider_catalog_hash,
1516        true,
1517    );
1518
1519    let mut tool_names = BTreeSet::new();
1520    for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1521        let path = format!("tool_manifest[{index}]");
1522        if tool.name.trim().is_empty() {
1523            push_error(
1524                report,
1525                format!("{path}.name"),
1526                "tool manifest entry name is required",
1527                None,
1528            );
1529        } else if !tool_names.insert(tool.name.clone()) {
1530            push_error(
1531                report,
1532                format!("{path}.name"),
1533                format!("duplicate tool manifest entry: {}", tool.name),
1534                None,
1535            );
1536        }
1537        if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1538            validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1539        }
1540    }
1541
1542    if bundle.sbom.format.trim().is_empty() {
1543        push_error(report, "sbom.format", "SBOM format is required", None);
1544    }
1545    if bundle.sbom.version.trim().is_empty() {
1546        push_error(report, "sbom.version", "SBOM version is required", None);
1547    }
1548    let mut sbom_packages = BTreeSet::new();
1549    for (index, package) in bundle.sbom.packages.iter().enumerate() {
1550        let path = format!("sbom.packages[{index}]");
1551        if package.name.trim().is_empty() {
1552            push_error(
1553                report,
1554                format!("{path}.name"),
1555                "SBOM package name is required",
1556                None,
1557            );
1558        } else if !sbom_packages.insert(package.name.clone()) {
1559            push_error(
1560                report,
1561                format!("{path}.name"),
1562                format!("duplicate SBOM package: {}", package.name),
1563                None,
1564            );
1565        }
1566        if let Some(hash) = package.package_hash_blake3.as_deref() {
1567            validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1568        }
1569    }
1570    for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1571        let path = format!("sbom.relationships[{index}]");
1572        if relationship.from.trim().is_empty() {
1573            push_error(
1574                report,
1575                format!("{path}.from"),
1576                "SBOM relationship source is required",
1577                None,
1578            );
1579        }
1580        if relationship.to.trim().is_empty() {
1581            push_error(
1582                report,
1583                format!("{path}.to"),
1584                "SBOM relationship target is required",
1585                None,
1586            );
1587        }
1588        if relationship.relationship_type.trim().is_empty() {
1589            push_error(
1590                report,
1591                format!("{path}.relationship_type"),
1592                "SBOM relationship type is required",
1593                None,
1594            );
1595        }
1596    }
1597
1598    if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1599        if parent_id.trim().is_empty() {
1600            push_error(
1601                report,
1602                "parent_trust_record_id",
1603                "parent_trust_record_id cannot be empty when present",
1604                None,
1605            );
1606        }
1607    }
1608    if let Some(signature) = &bundle.signature {
1609        if signature.algorithm.trim().is_empty() {
1610            push_error(
1611                report,
1612                "signature.algorithm",
1613                "signature algorithm is required",
1614                None,
1615            );
1616        } else if signature.algorithm != "ed25519" {
1617            push_error(
1618                report,
1619                "signature.algorithm",
1620                "signature algorithm must be ed25519",
1621                None,
1622            );
1623        }
1624        if signature.public_key.trim().is_empty() {
1625            push_error(
1626                report,
1627                "signature.public_key",
1628                "signature public_key is required",
1629                None,
1630            );
1631        }
1632        if signature.signature.trim().is_empty() {
1633            push_error(
1634                report,
1635                "signature.signature",
1636                "signature value is required",
1637                None,
1638            );
1639        }
1640        validate_blake3_hash(
1641            report,
1642            "signature.manifest_hash_blake3",
1643            &signature.manifest_hash_blake3,
1644            true,
1645        );
1646    }
1647}
1648
1649fn validate_relative_path(
1650    report: &mut WorkflowBundleValidationReport,
1651    path: impl Into<String>,
1652    value: &Path,
1653    empty_message: &str,
1654) {
1655    let path = path.into();
1656    if value.as_os_str().is_empty() {
1657        push_error(report, path, empty_message, None);
1658        return;
1659    }
1660    if normalize_archive_path(value).is_err() {
1661        push_error(
1662            report,
1663            path,
1664            format!("path must be relative and contained: {}", value.display()),
1665            None,
1666        );
1667    }
1668}
1669
1670fn validate_blake3_hash(
1671    report: &mut WorkflowBundleValidationReport,
1672    path: impl Into<String>,
1673    value: &str,
1674    required: bool,
1675) {
1676    let path = path.into();
1677    if value.trim().is_empty() {
1678        if required {
1679            push_error(report, path, "BLAKE3 hash is required", None);
1680        }
1681        return;
1682    }
1683    let Some(hex) = value.strip_prefix("blake3:") else {
1684        push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1685        return;
1686    };
1687    if hex.len() != 64
1688        || !hex
1689            .bytes()
1690            .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1691    {
1692        push_error(
1693            report,
1694            path,
1695            "BLAKE3 hash must contain 64 lowercase hex digits",
1696            None,
1697        );
1698    }
1699}
1700
1701fn validate_triggers(
1702    bundle: &WorkflowBundle,
1703    graph: &WorkflowGraph,
1704    report: &mut WorkflowBundleValidationReport,
1705) {
1706    if bundle.triggers.is_empty() {
1707        push_warning(report, "triggers", "bundle declares no triggers", None);
1708    }
1709    let mut ids = BTreeSet::new();
1710    for (index, trigger) in bundle.triggers.iter().enumerate() {
1711        let path = format!("triggers[{index}]");
1712        if trigger.id.trim().is_empty() {
1713            push_error(report, format!("{path}.id"), "trigger id is required", None);
1714        } else if !ids.insert(trigger.id.clone()) {
1715            push_error(
1716                report,
1717                format!("{path}.id"),
1718                format!("duplicate trigger id: {}", trigger.id),
1719                None,
1720            );
1721        }
1722        match trigger.kind.as_str() {
1723            "github" => {
1724                if trigger.provider.as_deref() != Some("github") {
1725                    push_error(
1726                        report,
1727                        format!("{path}.provider"),
1728                        "github triggers require provider=\"github\"",
1729                        None,
1730                    );
1731                }
1732                if trigger.events.is_empty() {
1733                    push_error(
1734                        report,
1735                        format!("{path}.events"),
1736                        "github triggers require at least one event",
1737                        None,
1738                    );
1739                }
1740            }
1741            "cron" if trigger.schedule.is_none() => push_error(
1742                report,
1743                format!("{path}.schedule"),
1744                "cron triggers require schedule",
1745                None,
1746            ),
1747            "delay" if trigger.delay.is_none() => push_error(
1748                report,
1749                format!("{path}.delay"),
1750                "delay triggers require delay",
1751                None,
1752            ),
1753            "webhook" if trigger.webhook_path.is_none() => push_error(
1754                report,
1755                format!("{path}.webhook_path"),
1756                "webhook triggers require webhook_path",
1757                None,
1758            ),
1759            "mcp" if trigger.mcp_tool.is_none() => push_error(
1760                report,
1761                format!("{path}.mcp_tool"),
1762                "mcp triggers require mcp_tool",
1763                None,
1764            ),
1765            "manual" => {}
1766            "" => push_error(
1767                report,
1768                format!("{path}.kind"),
1769                "trigger kind is required",
1770                None,
1771            ),
1772            other
1773                if !matches!(
1774                    other,
1775                    "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1776                ) =>
1777            {
1778                push_error(
1779                    report,
1780                    format!("{path}.kind"),
1781                    format!("unsupported trigger kind: {other}"),
1782                    None,
1783                );
1784            }
1785            _ => {}
1786        }
1787        if let Some(node_id) = trigger.node_id.as_deref() {
1788            if !graph.nodes.contains_key(node_id) {
1789                push_error(
1790                    report,
1791                    format!("{path}.node_id"),
1792                    format!("trigger references unknown node: {node_id}"),
1793                    Some(node_id.to_string()),
1794                );
1795            }
1796        }
1797    }
1798}
1799
1800fn validate_prompt_capsules(
1801    bundle: &WorkflowBundle,
1802    graph: &WorkflowGraph,
1803    report: &mut WorkflowBundleValidationReport,
1804) {
1805    let trigger_ids: BTreeSet<&str> = bundle
1806        .triggers
1807        .iter()
1808        .map(|trigger| trigger.id.as_str())
1809        .collect();
1810    let mut node_refs = BTreeSet::new();
1811    for (key, capsule) in &bundle.prompt_capsules {
1812        let path = format!("prompt_capsules.{key}");
1813        if capsule.id.trim().is_empty() {
1814            push_error(
1815                report,
1816                format!("{path}.id"),
1817                "prompt capsule id is required",
1818                None,
1819            );
1820        } else if capsule.id != *key {
1821            push_error(
1822                report,
1823                format!("{path}.id"),
1824                "prompt capsule id must match its map key",
1825                None,
1826            );
1827        }
1828        if capsule.prompt.trim().is_empty() {
1829            push_error(
1830                report,
1831                format!("{path}.prompt"),
1832                "prompt capsule prompt is required",
1833                Some(capsule.node_id.clone()),
1834            );
1835        }
1836        if !graph.nodes.contains_key(&capsule.node_id) {
1837            push_error(
1838                report,
1839                format!("{path}.node_id"),
1840                format!(
1841                    "prompt capsule references unknown node: {}",
1842                    capsule.node_id
1843                ),
1844                Some(capsule.node_id.clone()),
1845            );
1846        }
1847        if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1848            push_error(
1849                report,
1850                format!("{path}.node_id"),
1851                format!("multiple prompt capsules target node {}", capsule.node_id),
1852                Some(capsule.node_id.clone()),
1853            );
1854        }
1855        if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1856            if !trigger_ids.contains(trigger_id) {
1857                push_error(
1858                    report,
1859                    format!("{path}.trigger_id"),
1860                    format!("prompt capsule references unknown trigger: {trigger_id}"),
1861                    Some(capsule.node_id.clone()),
1862                );
1863            }
1864        }
1865    }
1866}
1867
1868fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1869    if !matches!(
1870        bundle.policy.autonomy_tier.as_str(),
1871        "shadow" | "suggest" | "act_with_approval" | "act_auto"
1872    ) {
1873        push_error(
1874            report,
1875            "policy.autonomy_tier",
1876            "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1877            None,
1878        );
1879    }
1880    if bundle.policy.retry.max_attempts == 0 {
1881        push_error(
1882            report,
1883            "policy.retry.max_attempts",
1884            "retry.max_attempts must be at least 1",
1885            None,
1886        );
1887    }
1888    if !matches!(
1889        bundle.policy.catchup.mode.as_str(),
1890        "none" | "latest" | "all"
1891    ) {
1892        push_error(
1893            report,
1894            "policy.catchup.mode",
1895            "catchup.mode must be none, latest, or all",
1896            None,
1897        );
1898    }
1899}
1900
1901fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1902    let mut ids = BTreeSet::new();
1903    let provider_ids: BTreeSet<&str> = bundle
1904        .connectors
1905        .iter()
1906        .map(|connector| connector.provider_id.as_str())
1907        .collect();
1908    for (index, connector) in bundle.connectors.iter().enumerate() {
1909        let path = format!("connectors[{index}]");
1910        if connector.id.trim().is_empty() {
1911            push_error(
1912                report,
1913                format!("{path}.id"),
1914                "connector id is required",
1915                None,
1916            );
1917        } else if !ids.insert(connector.id.clone()) {
1918            push_error(
1919                report,
1920                format!("{path}.id"),
1921                format!("duplicate connector id: {}", connector.id),
1922                None,
1923            );
1924        }
1925        if connector.provider_id.trim().is_empty() {
1926            push_error(
1927                report,
1928                format!("{path}.provider_id"),
1929                "connector provider_id is required",
1930                None,
1931            );
1932        }
1933    }
1934    for trigger in &bundle.triggers {
1935        if let Some(provider) = trigger.provider.as_deref() {
1936            if !provider_ids.contains(provider) {
1937                push_warning(
1938                    report,
1939                    "connectors",
1940                    format!(
1941                        "trigger {} references provider {provider} with no connector requirement",
1942                        trigger.id
1943                    ),
1944                    trigger.node_id.clone(),
1945                );
1946            }
1947        }
1948    }
1949}
1950
1951fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1952    if !matches!(
1953        bundle.environment.worktree_policy.as_str(),
1954        "reuse_current" | "new_worktree" | "host_managed"
1955    ) {
1956        push_error(
1957            report,
1958            "environment.worktree_policy",
1959            "worktree_policy must be reuse_current, new_worktree, or host_managed",
1960            None,
1961        );
1962    }
1963}
1964
1965fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1966    for prefix in [
1967        "node is unreachable: ",
1968        "edge.from references unknown node: ",
1969        "edge.to references unknown node: ",
1970        "entry node does not exist: ",
1971    ] {
1972        if let Some(node_id) = message.strip_prefix(prefix) {
1973            return Some(node_id.to_string());
1974        }
1975    }
1976    if let Some(rest) = message.strip_prefix("node ") {
1977        if let Some((node_id, _)) = rest.split_once(':') {
1978            return Some(node_id.to_string());
1979        }
1980    }
1981    graph
1982        .nodes
1983        .keys()
1984        .find(|node_id| message.contains(&format!("node {node_id}:")))
1985        .cloned()
1986}
1987
1988fn workflow_graph_id(node_id: &str) -> String {
1989    format!("node/{node_id}")
1990}
1991
1992fn trigger_graph_id(trigger_id: &str) -> String {
1993    format!("trigger/{trigger_id}")
1994}
1995
1996fn connector_graph_id(connector_id: &str) -> String {
1997    format!("connector/{connector_id}")
1998}
1999
2000fn catchup_graph_id() -> String {
2001    "policy/catchup".to_string()
2002}
2003
2004fn dlq_graph_id() -> String {
2005    "policy/dlq".to_string()
2006}
2007
2008fn terminal_completed_graph_id() -> String {
2009    "terminal/completed".to_string()
2010}
2011
2012fn terminal_failed_graph_id() -> String {
2013    "terminal/failed".to_string()
2014}
2015
2016fn workflow_node_type(kind: &str) -> String {
2017    match kind {
2018        "action" => "action",
2019        "stage" | "agent" => "agent",
2020        "subagent" | "worker" => "subagent",
2021        "wait" | "waitpoint" | "delay" => "wait",
2022        "approval" | "hitl" => "approval",
2023        "connector" | "connector_call" => "connector_call",
2024        "notification" | "notify" => "notification",
2025        "terminal" | "success" | "failure" => "terminal",
2026        other if other.trim().is_empty() => "agent",
2027        other => other,
2028    }
2029    .to_string()
2030}
2031
2032fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2033    node.task_label
2034        .clone()
2035        .or_else(|| node.prompt.clone())
2036        .map(|label| label.trim().to_string())
2037        .filter(|label| !label.is_empty())
2038        .unwrap_or_else(|| node_id.to_string())
2039}
2040
2041fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2042    if !trigger.events.is_empty() {
2043        format!("{}: {}", trigger.kind, trigger.events.join(", "))
2044    } else if let Some(schedule) = trigger.schedule.as_deref() {
2045        format!("cron: {schedule}")
2046    } else if let Some(delay) = trigger.delay.as_deref() {
2047        format!("delay: {delay}")
2048    } else {
2049        trigger.id.clone()
2050    }
2051}
2052
2053fn connector_label(connector: &ConnectorRequirement) -> String {
2054    if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2055        connector.id.clone()
2056    } else {
2057        format!("{} ({})", connector.id, connector.provider_id)
2058    }
2059}
2060
2061fn editable_field(
2062    id: impl Into<String>,
2063    label: impl Into<String>,
2064    json_pointer: impl Into<String>,
2065    value_type: impl Into<String>,
2066    required: bool,
2067    enum_values: &[&str],
2068) -> WorkflowBundleEditableField {
2069    WorkflowBundleEditableField {
2070        id: id.into(),
2071        label: label.into(),
2072        json_pointer: json_pointer.into(),
2073        value_type: value_type.into(),
2074        required,
2075        enum_values: enum_values
2076            .iter()
2077            .map(|value| (*value).to_string())
2078            .collect(),
2079    }
2080}
2081
2082fn json_pointer_segment(value: &str) -> String {
2083    value.replace('~', "~0").replace('/', "~1")
2084}
2085
2086fn trigger_editable_fields(
2087    index: usize,
2088    trigger: &WorkflowBundleTrigger,
2089) -> Vec<WorkflowBundleEditableField> {
2090    let base = format!("/triggers/{index}");
2091    let mut fields = vec![
2092        editable_field(
2093            format!("trigger.{}.kind", trigger.id),
2094            "Trigger kind",
2095            format!("{base}/kind"),
2096            "enum",
2097            true,
2098            &["github", "cron", "delay", "manual", "webhook", "mcp"],
2099        ),
2100        editable_field(
2101            format!("trigger.{}.node_id", trigger.id),
2102            "Target node",
2103            format!("{base}/node_id"),
2104            "string",
2105            false,
2106            &[],
2107        ),
2108    ];
2109    if trigger.provider.is_some() || trigger.kind == "github" {
2110        fields.push(editable_field(
2111            format!("trigger.{}.provider", trigger.id),
2112            "Provider",
2113            format!("{base}/provider"),
2114            "string",
2115            trigger.kind == "github",
2116            &[],
2117        ));
2118    }
2119    for (field, label, value_type) in [
2120        ("events", "Events", "list"),
2121        ("schedule", "Schedule", "string"),
2122        ("delay", "Delay", "string"),
2123        ("webhook_path", "Webhook path", "string"),
2124        ("mcp_tool", "MCP tool", "string"),
2125        ("resume_key", "Resume key", "string"),
2126        ("metadata", "Metadata", "object"),
2127    ] {
2128        fields.push(editable_field(
2129            format!("trigger.{}.{}", trigger.id, field),
2130            label,
2131            format!("{base}/{field}"),
2132            value_type,
2133            false,
2134            &[],
2135        ));
2136    }
2137    fields
2138}
2139
2140fn workflow_node_editable_fields(
2141    node_id: &str,
2142    capsule_id: Option<&String>,
2143) -> Vec<WorkflowBundleEditableField> {
2144    let escaped_node = json_pointer_segment(node_id);
2145    let mut fields = vec![
2146        editable_field(
2147            format!("workflow.{node_id}.task_label"),
2148            "Task label",
2149            format!("/workflow/nodes/{escaped_node}/task_label"),
2150            "string",
2151            false,
2152            &[],
2153        ),
2154        editable_field(
2155            format!("workflow.{node_id}.prompt"),
2156            "Prompt",
2157            format!("/workflow/nodes/{escaped_node}/prompt"),
2158            "string",
2159            false,
2160            &[],
2161        ),
2162        editable_field(
2163            format!("workflow.{node_id}.system"),
2164            "System prompt",
2165            format!("/workflow/nodes/{escaped_node}/system"),
2166            "string",
2167            false,
2168            &[],
2169        ),
2170        editable_field(
2171            format!("workflow.{node_id}.model_policy"),
2172            "Model policy",
2173            format!("/workflow/nodes/{escaped_node}/model_policy"),
2174            "object",
2175            false,
2176            &[],
2177        ),
2178        editable_field(
2179            format!("workflow.{node_id}.tools"),
2180            "Tool policy",
2181            format!("/workflow/nodes/{escaped_node}/tools"),
2182            "any",
2183            false,
2184            &[],
2185        ),
2186        editable_field(
2187            format!("workflow.{node_id}.capability_policy"),
2188            "Capability policy",
2189            format!("/workflow/nodes/{escaped_node}/capability_policy"),
2190            "object",
2191            false,
2192            &[],
2193        ),
2194        editable_field(
2195            format!("workflow.{node_id}.approval_policy"),
2196            "Approval policy",
2197            format!("/workflow/nodes/{escaped_node}/approval_policy"),
2198            "object",
2199            false,
2200            &[],
2201        ),
2202        editable_field(
2203            format!("workflow.{node_id}.retry_policy"),
2204            "Retry policy",
2205            format!("/workflow/nodes/{escaped_node}/retry_policy"),
2206            "object",
2207            false,
2208            &[],
2209        ),
2210    ];
2211    if let Some(capsule_id) = capsule_id {
2212        let escaped_capsule = json_pointer_segment(capsule_id);
2213        fields.extend([
2214            editable_field(
2215                format!("prompt_capsule.{capsule_id}.prompt"),
2216                "Prompt capsule",
2217                format!("/prompt_capsules/{escaped_capsule}/prompt"),
2218                "string",
2219                true,
2220                &[],
2221            ),
2222            editable_field(
2223                format!("prompt_capsule.{capsule_id}.system"),
2224                "Prompt capsule system",
2225                format!("/prompt_capsules/{escaped_capsule}/system"),
2226                "string",
2227                false,
2228                &[],
2229            ),
2230            editable_field(
2231                format!("prompt_capsule.{capsule_id}.context"),
2232                "Prompt capsule context",
2233                format!("/prompt_capsules/{escaped_capsule}/context"),
2234                "object",
2235                false,
2236                &[],
2237            ),
2238            editable_field(
2239                format!("prompt_capsule.{capsule_id}.trigger_id"),
2240                "Prompt capsule trigger",
2241                format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2242                "string",
2243                false,
2244                &[],
2245            ),
2246        ]);
2247    }
2248    fields
2249}
2250
2251fn connector_editable_fields(
2252    index: usize,
2253    connector: &ConnectorRequirement,
2254) -> Vec<WorkflowBundleEditableField> {
2255    let base = format!("/connectors/{index}");
2256    [
2257        ("id", "Connector id", "string", true),
2258        ("provider_id", "Provider id", "string", true),
2259        ("scopes", "Scopes", "list", false),
2260        ("setup_required", "Setup required", "bool", false),
2261        ("status_required", "Status required", "bool", false),
2262    ]
2263    .into_iter()
2264    .map(|(field, label, value_type, required)| {
2265        editable_field(
2266            format!("connector.{}.{}", connector.id, field),
2267            label,
2268            format!("{base}/{field}"),
2269            value_type,
2270            required,
2271            &[],
2272        )
2273    })
2274    .collect()
2275}
2276
2277fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2278    vec![
2279        editable_field(
2280            "policy.retry.max_attempts",
2281            "Retry attempts",
2282            "/policy/retry/max_attempts",
2283            "integer",
2284            true,
2285            &[],
2286        ),
2287        editable_field(
2288            "policy.retry.backoff",
2289            "Retry backoff",
2290            "/policy/retry/backoff",
2291            "string",
2292            true,
2293            &[],
2294        ),
2295    ]
2296}
2297
2298fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2299    vec![
2300        editable_field(
2301            "policy.catchup.mode",
2302            "Catchup mode",
2303            "/policy/catchup/mode",
2304            "enum",
2305            true,
2306            &["none", "latest", "all"],
2307        ),
2308        editable_field(
2309            "policy.catchup.max_events",
2310            "Catchup max events",
2311            "/policy/catchup/max_events",
2312            "integer",
2313            false,
2314            &[],
2315        ),
2316    ]
2317}
2318
2319fn render_workflow_bundle_mermaid(
2320    nodes: &[WorkflowBundleGraphNode],
2321    edges: &[WorkflowBundleGraphEdge],
2322) -> String {
2323    let mut lines = vec!["flowchart TD".to_string()];
2324    for node in nodes {
2325        lines.push(format!(
2326            "  {}[\"{}\"]",
2327            mermaid_id(&node.id),
2328            mermaid_label(&format!("{}: {}", node.node_type, node.label))
2329        ));
2330    }
2331    for edge in edges {
2332        let label = edge
2333            .label
2334            .as_deref()
2335            .or(edge.branch.as_deref())
2336            .map(mermaid_label);
2337        match label {
2338            Some(label) if !label.is_empty() => lines.push(format!(
2339                "  {} -->|{}| {}",
2340                mermaid_id(&edge.from),
2341                label,
2342                mermaid_id(&edge.to)
2343            )),
2344            _ => lines.push(format!(
2345                "  {} --> {}",
2346                mermaid_id(&edge.from),
2347                mermaid_id(&edge.to)
2348            )),
2349        }
2350    }
2351    lines.join("\n")
2352}
2353
2354fn mermaid_id(value: &str) -> String {
2355    let digest = Sha256::digest(value.as_bytes());
2356    let suffix = digest
2357        .iter()
2358        .take(4)
2359        .map(|byte| format!("{byte:02x}"))
2360        .collect::<String>();
2361    let mut out = format!("n_{suffix}_");
2362    for ch in value.chars() {
2363        if ch.is_ascii_alphanumeric() {
2364            out.push(ch);
2365        } else {
2366            out.push('_');
2367        }
2368    }
2369    out
2370}
2371
2372fn mermaid_label(value: &str) -> String {
2373    value
2374        .replace('\\', "\\\\")
2375        .replace('"', "\\\"")
2376        .replace('\n', " ")
2377}
2378
2379fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2380    let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2381    for trigger in &bundle.triggers {
2382        if let Some(node_id) = trigger.node_id.as_ref() {
2383            by_node
2384                .entry(node_id.clone())
2385                .or_default()
2386                .push(trigger.id.clone());
2387        }
2388    }
2389    by_node
2390}
2391
2392fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2393    bundle
2394        .prompt_capsules
2395        .iter()
2396        .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2397        .collect()
2398}
2399
2400fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2401    let outgoing =
2402        graph
2403            .edges
2404            .iter()
2405            .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2406                acc.entry(edge.from.clone())
2407                    .or_default()
2408                    .push(edge.to.clone());
2409                acc
2410            });
2411    let mut seen = BTreeSet::new();
2412    let mut queue = VecDeque::from([graph.entry.clone()]);
2413    let mut order = Vec::new();
2414    while let Some(node_id) = queue.pop_front() {
2415        if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2416            continue;
2417        }
2418        order.push(node_id.clone());
2419        if let Some(next) = outgoing.get(&node_id) {
2420            let mut next = next.clone();
2421            next.sort();
2422            for child in next {
2423                queue.push_back(child);
2424            }
2425        }
2426    }
2427    order
2428}
2429
2430fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2431    let suffix = graph_digest
2432        .strip_prefix("sha256:")
2433        .unwrap_or(graph_digest)
2434        .chars()
2435        .take(12)
2436        .collect::<String>();
2437    format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2438}
2439
2440fn sanitize_id(value: &str) -> String {
2441    value
2442        .chars()
2443        .map(|ch| {
2444            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2445                ch
2446            } else {
2447                '_'
2448            }
2449        })
2450        .collect()
2451}
2452
2453fn push_error(
2454    report: &mut WorkflowBundleValidationReport,
2455    path: impl Into<String>,
2456    message: impl Into<String>,
2457    node_id: Option<String>,
2458) {
2459    report.errors.push(WorkflowBundleDiagnostic {
2460        severity: "error".to_string(),
2461        path: path.into(),
2462        message: message.into(),
2463        node_id,
2464    });
2465}
2466
2467fn push_warning(
2468    report: &mut WorkflowBundleValidationReport,
2469    path: impl Into<String>,
2470    message: impl Into<String>,
2471    node_id: Option<String>,
2472) {
2473    report.warnings.push(WorkflowBundleDiagnostic {
2474        severity: "warning".to_string(),
2475        path: path.into(),
2476        message: message.into(),
2477        node_id,
2478    });
2479}
2480
2481#[cfg(test)]
2482mod tests;