1use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22const MAX_HARNPACK_DECOMPRESSED_BYTES: u64 = 100 * 1024 * 1024;
28
29fn decompress_harnpack_zstd(bytes: &[u8]) -> Result<Vec<u8>, WorkflowBundleError> {
34 let mut decoder = zstd::stream::Decoder::new(Cursor::new(bytes))?;
35 let mut output = Vec::new();
36 let mut limited = (&mut decoder).take(MAX_HARNPACK_DECOMPRESSED_BYTES.saturating_add(1));
37 limited.read_to_end(&mut output)?;
38 if output.len() as u64 > MAX_HARNPACK_DECOMPRESSED_BYTES {
39 return Err(WorkflowBundleError::new(
40 WorkflowBundleErrorKind::InvalidArchive,
41 format!(
42 "harnpack zstd payload decompressed past max size ({MAX_HARNPACK_DECOMPRESSED_BYTES} bytes); refusing to extract"
43 ),
44 ));
45 }
46 Ok(output)
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct WorkflowBundle {
52 pub schema_version: u32,
53 pub entrypoint: PathBuf,
54 pub transitive_modules: Vec<ModuleEntry>,
55 pub stdlib_version: String,
56 pub harn_version: String,
57 pub provider_catalog_hash: String,
58 pub tool_manifest: Vec<ToolEntry>,
59 pub sbom: SBOMDoc,
60 pub signature: Option<Ed25519Signature>,
61 pub parent_trust_record_id: Option<String>,
62 pub id: String,
63 pub name: Option<String>,
64 pub version: String,
65 pub triggers: Vec<WorkflowBundleTrigger>,
66 pub workflow: WorkflowGraph,
67 pub prompt_capsules: BTreeMap<String, PromptCapsule>,
68 pub policy: WorkflowBundlePolicy,
69 pub connectors: Vec<ConnectorRequirement>,
70 pub environment: EnvironmentRequirements,
71 pub receipts: WorkflowBundleReplayMetadata,
72 pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75impl Default for WorkflowBundle {
76 fn default() -> Self {
77 Self {
78 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
79 entrypoint: PathBuf::new(),
80 transitive_modules: Vec::new(),
81 stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
82 harn_version: env!("CARGO_PKG_VERSION").to_string(),
83 provider_catalog_hash: String::new(),
84 tool_manifest: Vec::new(),
85 sbom: SBOMDoc::default(),
86 signature: None,
87 parent_trust_record_id: None,
88 id: String::new(),
89 name: None,
90 version: String::new(),
91 triggers: Vec::new(),
92 workflow: WorkflowGraph::default(),
93 prompt_capsules: BTreeMap::new(),
94 policy: WorkflowBundlePolicy::default(),
95 connectors: Vec::new(),
96 environment: EnvironmentRequirements::default(),
97 receipts: WorkflowBundleReplayMetadata::default(),
98 metadata: BTreeMap::new(),
99 }
100 }
101}
102
103#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(default)]
105pub struct ModuleEntry {
106 pub path: PathBuf,
107 pub source_hash_blake3: String,
108 pub harnbc_hash_blake3: String,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ToolEntry {
114 pub name: String,
115 pub provider: Option<String>,
116 pub annotations: Option<ToolAnnotations>,
117 pub schema_hash_blake3: Option<String>,
118 pub metadata: BTreeMap<String, String>,
119}
120
121#[allow(clippy::upper_case_acronyms)]
122#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
123#[serde(default)]
124pub struct SBOMDoc {
125 pub format: String,
126 pub version: String,
127 pub packages: Vec<SBOMPackage>,
128 pub relationships: Vec<SBOMRelationship>,
129}
130
131#[allow(clippy::upper_case_acronyms)]
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct SBOMPackage {
135 pub name: String,
136 pub version: Option<String>,
137 pub package_hash_blake3: Option<String>,
138 pub license: Option<String>,
139}
140
141#[allow(clippy::upper_case_acronyms)]
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct SBOMRelationship {
145 pub from: String,
146 pub to: String,
147 pub relationship_type: String,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct Ed25519Signature {
153 pub key_id: Option<String>,
154 pub public_key: String,
155 pub signature: String,
156 pub manifest_hash_blake3: String,
157 pub algorithm: String,
158}
159
160#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
161#[serde(default)]
162pub struct WorkflowBundleTrigger {
163 pub id: String,
164 pub kind: String,
165 pub provider: Option<String>,
166 pub events: Vec<String>,
167 pub schedule: Option<String>,
168 pub delay: Option<String>,
169 pub webhook_path: Option<String>,
170 pub mcp_tool: Option<String>,
171 pub resume_key: Option<String>,
172 pub node_id: Option<String>,
173 pub metadata: BTreeMap<String, String>,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct PromptCapsule {
179 pub id: String,
180 pub node_id: String,
181 pub trigger_id: Option<String>,
182 pub prompt: String,
183 pub system: Option<String>,
184 pub context: BTreeMap<String, serde_json::Value>,
185}
186
187#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
188#[serde(default)]
189pub struct WorkflowBundlePolicy {
190 pub autonomy_tier: String,
191 pub tool_policy: BTreeMap<String, serde_json::Value>,
192 pub approval_required: Vec<String>,
193 pub retry: RetryPolicySpec,
194 pub catchup: CatchupPolicySpec,
195}
196
197impl Default for WorkflowBundlePolicy {
198 fn default() -> Self {
199 Self {
200 autonomy_tier: "act_with_approval".to_string(),
201 tool_policy: BTreeMap::new(),
202 approval_required: Vec::new(),
203 retry: RetryPolicySpec::default(),
204 catchup: CatchupPolicySpec::default(),
205 }
206 }
207}
208
209#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(default)]
211pub struct RetryPolicySpec {
212 pub max_attempts: u32,
213 pub backoff: String,
214}
215
216impl Default for RetryPolicySpec {
217 fn default() -> Self {
218 Self {
219 max_attempts: 1,
220 backoff: "none".to_string(),
221 }
222 }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226#[serde(default)]
227pub struct CatchupPolicySpec {
228 pub mode: String,
229 pub max_events: Option<u32>,
230}
231
232impl Default for CatchupPolicySpec {
233 fn default() -> Self {
234 Self {
235 mode: "latest".to_string(),
236 max_events: Some(1),
237 }
238 }
239}
240
241#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
242#[serde(default)]
243pub struct ConnectorRequirement {
244 pub id: String,
245 pub provider_id: String,
246 pub scopes: Vec<String>,
247 pub setup_required: bool,
248 pub status_required: bool,
249}
250
251#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct EnvironmentRequirements {
254 pub repo_setup_profile: Option<String>,
255 pub worktree_policy: String,
256 pub command_gates: Vec<String>,
257}
258
259impl Default for EnvironmentRequirements {
260 fn default() -> Self {
261 Self {
262 repo_setup_profile: None,
263 worktree_policy: "host_managed".to_string(),
264 command_gates: Vec::new(),
265 }
266 }
267}
268
269#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
270#[serde(default)]
271pub struct WorkflowBundleReplayMetadata {
272 pub run_id: Option<String>,
273 pub event_ids: Vec<String>,
274 pub workflow_version: Option<usize>,
275 pub graph_digest: Option<String>,
276}
277
278#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
279#[serde(default)]
280pub struct WorkflowBundleDiagnostic {
281 pub severity: String,
282 pub path: String,
283 pub message: String,
284 pub node_id: Option<String>,
285}
286
287#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
288#[serde(default)]
289pub struct WorkflowBundleValidationReport {
290 pub valid: bool,
291 pub bundle_id: String,
292 pub workflow_id: String,
293 pub graph_digest: String,
294 pub errors: Vec<WorkflowBundleDiagnostic>,
295 pub warnings: Vec<WorkflowBundleDiagnostic>,
296}
297
298#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
299pub struct WorkflowBundlePreview {
300 pub schema_version: u32,
301 pub bundle_id: String,
302 pub bundle_version: String,
303 pub workflow_id: String,
304 pub workflow_version: usize,
305 pub graph_digest: String,
306 pub validation: WorkflowBundleValidationReport,
307 pub graph: WorkflowBundleGraphExport,
308 pub mermaid: String,
309 pub triggers: Vec<WorkflowBundleTrigger>,
310 pub connectors: Vec<ConnectorRequirement>,
311 pub environment: EnvironmentRequirements,
312 pub nodes: Vec<WorkflowBundlePreviewNode>,
313 pub edges: Vec<WorkflowEdge>,
314 #[serde(default)]
319 pub metadata: BTreeMap<String, serde_json::Value>,
320}
321
322#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
323pub struct WorkflowBundlePreviewNode {
324 pub id: String,
325 pub kind: String,
326 pub label: Option<String>,
327 pub prompt_capsule: Option<String>,
328 pub trigger_ids: Vec<String>,
329 pub outgoing: Vec<String>,
330}
331
332#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
333pub struct WorkflowBundleGraphExport {
334 pub schema_version: u32,
335 pub graph_id: String,
336 pub graph_digest: String,
337 pub nodes: Vec<WorkflowBundleGraphNode>,
338 pub edges: Vec<WorkflowBundleGraphEdge>,
339 pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
340 pub editable_fields: Vec<WorkflowBundleEditableField>,
341 pub mermaid: String,
342}
343
344#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
345pub struct WorkflowBundleGraphNode {
346 pub id: String,
347 pub node_type: String,
348 pub label: String,
349 pub workflow_node_id: Option<String>,
350 pub trigger_id: Option<String>,
351 pub connector_id: Option<String>,
352 pub editable_fields: Vec<WorkflowBundleEditableField>,
353 pub metadata: BTreeMap<String, serde_json::Value>,
354}
355
356#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
357pub struct WorkflowBundleGraphEdge {
358 pub from: String,
359 pub to: String,
360 pub label: Option<String>,
361 pub branch: Option<String>,
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
365pub struct WorkflowBundleGraphDiagnostic {
366 pub severity: String,
367 pub path: String,
368 pub message: String,
369 pub node_id: Option<String>,
370 pub graph_node_id: Option<String>,
371}
372
373#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
374pub struct WorkflowBundleEditableField {
375 pub id: String,
376 pub label: String,
377 pub json_pointer: String,
378 pub value_type: String,
379 pub required: bool,
380 pub enum_values: Vec<String>,
381}
382
383#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
384#[serde(default)]
385pub struct WorkflowBundleRunRequest {
386 pub trigger_id: Option<String>,
387 pub event_id: Option<String>,
388}
389
390#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
391pub struct WorkflowBundleRunReceipt {
392 pub schema_version: u32,
393 pub receipt_type: String,
394 pub bundle_id: String,
395 pub bundle_version: String,
396 pub workflow_id: String,
397 pub workflow_version: usize,
398 pub graph_digest: String,
399 pub run_id: String,
400 pub trigger_id: Option<String>,
401 pub event_ids: Vec<String>,
402 pub status: String,
403 pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
404 pub policy: WorkflowBundlePolicy,
405 pub connectors: Vec<ConnectorRequirement>,
406 pub environment: EnvironmentRequirements,
407}
408
409#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
410pub struct WorkflowBundleRunNodeReceipt {
411 pub node_id: String,
412 pub kind: String,
413 pub prompt_capsule: Option<String>,
414 pub status: String,
415}
416
417#[derive(Clone, Debug, PartialEq, Eq)]
418pub struct HarnpackEntry {
419 pub path: PathBuf,
420 pub bytes: Vec<u8>,
421 pub mode: u32,
422}
423
424impl HarnpackEntry {
425 pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
426 Self {
427 path: path.into(),
428 bytes: bytes.into(),
429 mode: DEFAULT_HARNPACK_FILE_MODE,
430 }
431 }
432
433 pub fn with_mode(mut self, mode: u32) -> Self {
434 self.mode = mode;
435 self
436 }
437}
438
439#[derive(Clone, Debug, PartialEq)]
440pub struct HarnpackArchive {
441 pub manifest: WorkflowBundle,
442 pub contents: Vec<HarnpackEntry>,
443}
444
445#[derive(Clone, Debug, PartialEq, Eq)]
446pub enum WorkflowBundleErrorKind {
447 Io,
448 Json,
449 MissingSchemaVersion,
450 UnsupportedSchemaVersion { actual: u32, expected: u32 },
451 InvalidArchive,
452 DuplicateArchiveEntry,
453 UnsafeArchivePath,
454 InvalidSignature,
455}
456
457#[derive(Clone, Debug, PartialEq, Eq)]
458pub struct WorkflowBundleError {
459 pub kind: WorkflowBundleErrorKind,
460 pub message: String,
461}
462
463impl WorkflowBundleError {
464 fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
465 Self {
466 kind,
467 message: message.into(),
468 }
469 }
470
471 fn unsupported_schema_version(actual: u32) -> Self {
472 Self::new(
473 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
474 actual,
475 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
476 },
477 format!(
478 "unsupported workflow bundle schema_version {actual}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
479 ),
480 )
481 }
482}
483
484impl std::fmt::Display for WorkflowBundleError {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 self.message.fmt(f)
487 }
488}
489
490impl std::error::Error for WorkflowBundleError {}
491
492impl From<std::io::Error> for WorkflowBundleError {
493 fn from(error: std::io::Error) -> Self {
494 Self::new(WorkflowBundleErrorKind::Io, error.to_string())
495 }
496}
497
498impl From<serde_json::Error> for WorkflowBundleError {
499 fn from(error: serde_json::Error) -> Self {
500 Self::new(WorkflowBundleErrorKind::Json, error.to_string())
501 }
502}
503
504pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
505 let bytes = fs::read(path)?;
506 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
507 read_harnpack(&bytes).map(|archive| archive.manifest)
508 } else {
509 parse_workflow_bundle_manifest(&bytes)
510 }
511}
512
513pub fn read_workflow_bundle_manifest_any_version(
519 bytes: &[u8],
520) -> Result<WorkflowBundle, WorkflowBundleError> {
521 let value: serde_json::Value = serde_json::from_slice(bytes)?;
522 if value.get("schema_version").is_none() {
523 return Err(WorkflowBundleError::new(
524 WorkflowBundleErrorKind::MissingSchemaVersion,
525 "workflow bundle manifest is missing schema_version",
526 ));
527 }
528 serde_json::from_value(value).map_err(Into::into)
529}
530
531pub fn load_workflow_bundle_any_version(
535 path: &Path,
536) -> Result<WorkflowBundle, WorkflowBundleError> {
537 let bytes = fs::read(path)?;
538 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
539 let tar_bytes = decompress_harnpack_zstd(&bytes)?;
540 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
541 for entry in archive.entries()? {
542 let mut entry = entry?;
543 if entry.header().entry_type().is_dir() {
544 continue;
545 }
546 let path = normalize_archive_path(entry.path()?.as_ref())?;
547 if path == HARNPACK_MANIFEST_PATH {
548 let mut entry_bytes = Vec::new();
549 entry.read_to_end(&mut entry_bytes)?;
550 return read_workflow_bundle_manifest_any_version(&entry_bytes);
551 }
552 }
553 Err(WorkflowBundleError::new(
554 WorkflowBundleErrorKind::InvalidArchive,
555 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
556 ))
557 } else {
558 read_workflow_bundle_manifest_any_version(&bytes)
559 }
560}
561
562pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
563 let value: serde_json::Value = serde_json::from_slice(bytes)?;
564 let schema_version = value
565 .get("schema_version")
566 .and_then(serde_json::Value::as_u64)
567 .ok_or_else(|| {
568 WorkflowBundleError::new(
569 WorkflowBundleErrorKind::MissingSchemaVersion,
570 "workflow bundle manifest is missing numeric schema_version",
571 )
572 })?;
573 let actual = u32::try_from(schema_version).map_err(|_| {
574 WorkflowBundleError::new(
575 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
576 actual: u32::MAX,
577 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
578 },
579 format!(
580 "unsupported workflow bundle schema_version {schema_version}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
581 ),
582 )
583 })?;
584 if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
585 return Err(WorkflowBundleError::unsupported_schema_version(actual));
586 }
587 serde_json::from_value(value).map_err(Into::into)
588}
589
590pub fn canonical_workflow_bundle_manifest_bytes(
591 bundle: &WorkflowBundle,
592) -> Result<Vec<u8>, WorkflowBundleError> {
593 serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
594}
595
596pub fn workflow_bundle_hash(
597 bundle: &WorkflowBundle,
598 contents: &[HarnpackEntry],
599) -> Result<String, WorkflowBundleError> {
600 let mut hasher = blake3::Hasher::new();
601 let mut canonical = canonical_workflow_bundle_manifest(bundle);
602 canonical.signature = None;
603 let manifest_bytes = serde_json::to_vec(&canonical)?;
604 hasher.update(&manifest_bytes);
605
606 let mut content_hashes = contents
607 .iter()
608 .map(|entry| blake3_hash_bytes(&entry.bytes))
609 .collect::<Vec<_>>();
610 content_hashes.sort();
611 for content_hash in content_hashes {
612 hasher.update(b"\n");
613 hasher.update(content_hash.as_bytes());
614 }
615
616 Ok(blake3_digest_string(hasher.finalize()))
617}
618
619pub fn verify_workflow_bundle_signature(
620 bundle: &WorkflowBundle,
621 contents: &[HarnpackEntry],
622) -> Result<(), WorkflowBundleError> {
623 let signature = bundle.signature.as_ref().ok_or_else(|| {
624 WorkflowBundleError::new(
625 WorkflowBundleErrorKind::InvalidSignature,
626 "workflow bundle is unsigned",
627 )
628 })?;
629 if signature.algorithm != "ed25519" {
630 return Err(WorkflowBundleError::new(
631 WorkflowBundleErrorKind::InvalidSignature,
632 format!(
633 "unsupported workflow bundle signature algorithm {}",
634 signature.algorithm
635 ),
636 ));
637 }
638 let expected_hash = workflow_bundle_hash(bundle, contents)?;
639 if signature.manifest_hash_blake3 != expected_hash {
640 return Err(WorkflowBundleError::new(
641 WorkflowBundleErrorKind::InvalidSignature,
642 format!(
643 "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
644 signature.manifest_hash_blake3
645 ),
646 ));
647 }
648 let public_key_bytes = decode_hex_exact::<32>(
649 "workflow bundle signature public_key",
650 &signature.public_key,
651 )?;
652 let signature_bytes =
653 decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
654 let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
655 WorkflowBundleError::new(
656 WorkflowBundleErrorKind::InvalidSignature,
657 format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
658 )
659 })?;
660 let ed25519_signature = Signature::from_bytes(&signature_bytes);
661 verifying_key
662 .verify(expected_hash.as_bytes(), &ed25519_signature)
663 .map_err(|error| {
664 WorkflowBundleError::new(
665 WorkflowBundleErrorKind::InvalidSignature,
666 format!("workflow bundle signature failed Ed25519 verification: {error}"),
667 )
668 })
669}
670
671pub fn build_harnpack(
672 bundle: &WorkflowBundle,
673 contents: &[HarnpackEntry],
674) -> Result<Vec<u8>, WorkflowBundleError> {
675 let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
676 let mut entries = contents
677 .iter()
678 .map(|entry| {
679 normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
680 })
681 .collect::<Result<Vec<_>, _>>()?;
682 entries.sort_by(|left, right| left.0.cmp(&right.0));
683
684 let mut seen = BTreeSet::new();
685 for (path, _, _) in &entries {
686 if path == HARNPACK_MANIFEST_PATH {
687 return Err(WorkflowBundleError::new(
688 WorkflowBundleErrorKind::DuplicateArchiveEntry,
689 format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
690 ));
691 }
692 if !seen.insert(path.clone()) {
693 return Err(WorkflowBundleError::new(
694 WorkflowBundleErrorKind::DuplicateArchiveEntry,
695 format!("duplicate archive entry: {path}"),
696 ));
697 }
698 }
699
700 let mut tar_bytes = Vec::new();
701 {
702 let mut builder = tar::Builder::new(&mut tar_bytes);
703 append_harnpack_entry(
704 &mut builder,
705 HARNPACK_MANIFEST_PATH,
706 &manifest_bytes,
707 DEFAULT_HARNPACK_FILE_MODE,
708 )?;
709 for (path, bytes, mode) in entries {
710 append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
711 }
712 builder.finish()?;
713 }
714
715 zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
716}
717
718pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
719 let tar_bytes = decompress_harnpack_zstd(bytes)?;
720 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
721 let mut manifest = None;
722 let mut contents = Vec::new();
723 let mut seen = BTreeSet::new();
724
725 for entry in archive.entries()? {
726 let mut entry = entry?;
727 if entry.header().entry_type().is_dir() {
728 continue;
729 }
730 let path = normalize_archive_path(entry.path()?.as_ref())?;
731 if !seen.insert(path.clone()) {
732 return Err(WorkflowBundleError::new(
733 WorkflowBundleErrorKind::DuplicateArchiveEntry,
734 format!("duplicate archive entry: {path}"),
735 ));
736 }
737 let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
738 let mut entry_bytes = Vec::new();
739 entry.read_to_end(&mut entry_bytes)?;
740
741 if path == HARNPACK_MANIFEST_PATH {
742 manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
743 } else {
744 contents.push(HarnpackEntry {
745 path: PathBuf::from(path),
746 bytes: entry_bytes,
747 mode,
748 });
749 }
750 }
751
752 contents.sort_by(|left, right| left.path.cmp(&right.path));
753 Ok(HarnpackArchive {
754 manifest: manifest.ok_or_else(|| {
755 WorkflowBundleError::new(
756 WorkflowBundleErrorKind::InvalidArchive,
757 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
758 )
759 })?,
760 contents,
761 })
762}
763
764pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
765 let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
766 Ok(blake3_hash_bytes(&bytes))
767}
768
769pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
770 let mut canonical = canonical_workflow_graph(graph);
771 canonical.audit_log.clear();
772 let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
773 let digest = Sha256::digest(bytes);
774 let hex = digest
775 .iter()
776 .map(|byte| format!("{byte:02x}"))
777 .collect::<String>();
778 format!("sha256:{hex}")
779}
780
781fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
782 let mut canonical = bundle.clone();
783 canonical.workflow = canonical_workflow_graph(&bundle.workflow);
784 canonical.transitive_modules.sort_by(|left, right| {
785 (
786 path_sort_key(&left.path),
787 &left.source_hash_blake3,
788 &left.harnbc_hash_blake3,
789 )
790 .cmp(&(
791 path_sort_key(&right.path),
792 &right.source_hash_blake3,
793 &right.harnbc_hash_blake3,
794 ))
795 });
796 canonical.tool_manifest.sort_by(|left, right| {
797 (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
798 &right.name,
799 &right.provider,
800 &right.schema_hash_blake3,
801 ))
802 });
803 canonical.sbom.packages.sort_by(|left, right| {
804 (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
805 &right.name,
806 &right.version,
807 &right.package_hash_blake3,
808 ))
809 });
810 canonical.sbom.relationships.sort_by(|left, right| {
811 (&left.from, &left.to, &left.relationship_type).cmp(&(
812 &right.from,
813 &right.to,
814 &right.relationship_type,
815 ))
816 });
817 canonical
818}
819
820fn decode_hex_exact<const N: usize>(
821 label: &str,
822 value: &str,
823) -> Result<[u8; N], WorkflowBundleError> {
824 let bytes = hex::decode(value).map_err(|error| {
825 WorkflowBundleError::new(
826 WorkflowBundleErrorKind::InvalidSignature,
827 format!("{label} is not hex: {error}"),
828 )
829 })?;
830 bytes.try_into().map_err(|bytes: Vec<u8>| {
831 WorkflowBundleError::new(
832 WorkflowBundleErrorKind::InvalidSignature,
833 format!("{label} must decode to {N} bytes, got {}", bytes.len()),
834 )
835 })
836}
837
838fn append_harnpack_entry<W: std::io::Write>(
839 builder: &mut tar::Builder<W>,
840 path: &str,
841 bytes: &[u8],
842 mode: u32,
843) -> Result<(), WorkflowBundleError> {
844 let mut header = tar::Header::new_gnu();
845 header.set_path(path).map_err(|error| {
846 WorkflowBundleError::new(
847 WorkflowBundleErrorKind::UnsafeArchivePath,
848 format!("invalid archive path {path}: {error}"),
849 )
850 })?;
851 header.set_entry_type(tar::EntryType::Regular);
852 header.set_size(bytes.len() as u64);
853 header.set_mode(mode);
854 header.set_mtime(0);
855 header.set_uid(0);
856 header.set_gid(0);
857 header.set_cksum();
858 builder.append(&header, bytes)?;
859 Ok(())
860}
861
862fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
863 let mut parts = Vec::new();
864 for component in path.components() {
865 match component {
866 Component::Normal(part) => {
867 let Some(part) = part.to_str() else {
868 return Err(WorkflowBundleError::new(
869 WorkflowBundleErrorKind::UnsafeArchivePath,
870 format!("archive path is not valid UTF-8: {}", path.display()),
871 ));
872 };
873 if part.is_empty() {
874 return Err(WorkflowBundleError::new(
875 WorkflowBundleErrorKind::UnsafeArchivePath,
876 "archive path contains an empty component",
877 ));
878 }
879 parts.push(part.to_string());
880 }
881 Component::CurDir => {}
882 Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
883 return Err(WorkflowBundleError::new(
884 WorkflowBundleErrorKind::UnsafeArchivePath,
885 format!(
886 "archive path must be relative and contained: {}",
887 path.display()
888 ),
889 ));
890 }
891 }
892 }
893 if parts.is_empty() {
894 return Err(WorkflowBundleError::new(
895 WorkflowBundleErrorKind::UnsafeArchivePath,
896 "archive path is empty",
897 ));
898 }
899 Ok(parts.join("/"))
900}
901
902fn path_sort_key(path: &Path) -> String {
903 path.components()
904 .filter_map(|component| match component {
905 Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
906 _ => None,
907 })
908 .collect::<Vec<_>>()
909 .join("/")
910}
911
912fn blake3_hash_bytes(bytes: &[u8]) -> String {
913 blake3_digest_string(blake3::hash(bytes))
914}
915
916fn blake3_digest_string(hash: blake3::Hash) -> String {
917 format!("blake3:{hash}")
918}
919
920pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
921 let canonical = canonical_workflow_graph(&bundle.workflow);
922 let mut report = WorkflowBundleValidationReport {
923 valid: true,
924 bundle_id: bundle.id.clone(),
925 workflow_id: canonical.id.clone(),
926 graph_digest: workflow_graph_digest(&canonical),
927 errors: Vec::new(),
928 warnings: Vec::new(),
929 };
930
931 validate_manifest_contract(bundle, &mut report);
932 validate_bundle_identity(bundle, &canonical, &mut report);
933 validate_triggers(bundle, &canonical, &mut report);
934 validate_prompt_capsules(bundle, &canonical, &mut report);
935 validate_policy(bundle, &mut report);
936 validate_connectors(bundle, &mut report);
937 validate_environment(bundle, &mut report);
938
939 let graph_report = validate_workflow(&canonical, None);
940 for error in graph_report.errors {
941 let node_id = workflow_diagnostic_node_id(&error, &canonical);
942 push_error(&mut report, "workflow", error, node_id);
943 }
944 for warning in graph_report.warnings {
945 let node_id = workflow_diagnostic_node_id(&warning, &canonical);
946 push_warning(&mut report, "workflow", warning, node_id);
947 }
948
949 if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
950 if expected != report.graph_digest {
951 let actual = report.graph_digest.clone();
952 push_error(
953 &mut report,
954 "receipts.graph_digest",
955 format!("graph digest mismatch: expected {expected}, computed {actual}"),
956 None,
957 );
958 }
959 }
960 if let Some(expected_version) = bundle.receipts.workflow_version {
961 if expected_version != canonical.version {
962 push_error(
963 &mut report,
964 "receipts.workflow_version",
965 format!(
966 "workflow version mismatch: expected {expected_version}, computed {}",
967 canonical.version
968 ),
969 None,
970 );
971 }
972 }
973
974 report.valid = report.errors.is_empty();
975 report
976}
977
978pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
979 let canonical = canonical_workflow_graph(&bundle.workflow);
980 let validation = validate_workflow_bundle(bundle);
981 let graph = export_workflow_bundle_graph(bundle, &validation);
982 let mermaid = graph.mermaid.clone();
983 let triggers_by_node = triggers_by_node(bundle);
984 let capsules_by_node = capsules_by_node(bundle);
985 let mut nodes = Vec::new();
986
987 for (node_id, node) in &canonical.nodes {
988 let mut outgoing = canonical
989 .edges
990 .iter()
991 .filter(|edge| edge.from == *node_id)
992 .map(|edge| edge.to.clone())
993 .collect::<Vec<_>>();
994 outgoing.sort();
995 outgoing.dedup();
996 nodes.push(WorkflowBundlePreviewNode {
997 id: node_id.clone(),
998 kind: node.kind.clone(),
999 label: node.task_label.clone(),
1000 prompt_capsule: capsules_by_node.get(node_id).cloned(),
1001 trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
1002 outgoing,
1003 });
1004 }
1005
1006 WorkflowBundlePreview {
1007 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1008 bundle_id: bundle.id.clone(),
1009 bundle_version: bundle.version.clone(),
1010 workflow_id: canonical.id.clone(),
1011 workflow_version: canonical.version,
1012 graph_digest: validation.graph_digest.clone(),
1013 validation,
1014 graph,
1015 mermaid,
1016 triggers: bundle.triggers.clone(),
1017 connectors: bundle.connectors.clone(),
1018 environment: bundle.environment.clone(),
1019 nodes,
1020 edges: sorted_edges(&canonical),
1021 metadata: bundle.metadata.clone(),
1022 }
1023}
1024
1025pub fn export_workflow_bundle_graph(
1026 bundle: &WorkflowBundle,
1027 validation: &WorkflowBundleValidationReport,
1028) -> WorkflowBundleGraphExport {
1029 let canonical = canonical_workflow_graph(&bundle.workflow);
1030 let mut nodes = Vec::new();
1031 let mut edges = Vec::new();
1032 let mut editable_fields = Vec::new();
1033 let capsules_by_node = capsules_by_node(bundle);
1034 let catchup_enabled = bundle.policy.catchup.mode != "none";
1035 let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1036
1037 for (index, connector) in bundle.connectors.iter().enumerate() {
1038 let node_fields = connector_editable_fields(index, connector);
1039 editable_fields.extend(node_fields.clone());
1040 nodes.push(WorkflowBundleGraphNode {
1041 id: connector_graph_id(&connector.id),
1042 node_type: "connector_call".to_string(),
1043 label: connector_label(connector),
1044 workflow_node_id: None,
1045 trigger_id: None,
1046 connector_id: Some(connector.id.clone()),
1047 editable_fields: node_fields,
1048 metadata: BTreeMap::from([
1049 (
1050 "provider_id".to_string(),
1051 serde_json::json!(connector.provider_id),
1052 ),
1053 ("scopes".to_string(), serde_json::json!(connector.scopes)),
1054 ]),
1055 });
1056 }
1057
1058 let catchup_fields = catchup_editable_fields();
1059 let retry_fields = retry_editable_fields();
1060 if catchup_enabled {
1061 let node_fields = catchup_fields;
1062 editable_fields.extend(node_fields.clone());
1063 nodes.push(WorkflowBundleGraphNode {
1064 id: catchup_graph_id(),
1065 node_type: "catchup".to_string(),
1066 label: "Catch up".to_string(),
1067 workflow_node_id: None,
1068 trigger_id: None,
1069 connector_id: None,
1070 editable_fields: node_fields,
1071 metadata: BTreeMap::from([(
1072 "mode".to_string(),
1073 serde_json::json!(bundle.policy.catchup.mode),
1074 )]),
1075 });
1076 } else {
1077 editable_fields.extend(catchup_fields);
1078 }
1079 if retry_can_dlq {
1080 let node_fields = retry_fields;
1081 editable_fields.extend(node_fields.clone());
1082 nodes.push(WorkflowBundleGraphNode {
1083 id: dlq_graph_id(),
1084 node_type: "dlq".to_string(),
1085 label: "Dead letter queue".to_string(),
1086 workflow_node_id: None,
1087 trigger_id: None,
1088 connector_id: None,
1089 editable_fields: node_fields,
1090 metadata: BTreeMap::from([(
1091 "max_attempts".to_string(),
1092 serde_json::json!(bundle.policy.retry.max_attempts),
1093 )]),
1094 });
1095 } else {
1096 editable_fields.extend(retry_fields);
1097 }
1098
1099 for (index, trigger) in bundle.triggers.iter().enumerate() {
1100 let node_fields = trigger_editable_fields(index, trigger);
1101 editable_fields.extend(node_fields.clone());
1102 nodes.push(WorkflowBundleGraphNode {
1103 id: trigger_graph_id(&trigger.id),
1104 node_type: "trigger".to_string(),
1105 label: trigger_label(trigger),
1106 workflow_node_id: trigger.node_id.clone(),
1107 trigger_id: Some(trigger.id.clone()),
1108 connector_id: None,
1109 editable_fields: node_fields,
1110 metadata: BTreeMap::from([
1111 ("kind".to_string(), serde_json::json!(trigger.kind)),
1112 ("provider".to_string(), serde_json::json!(trigger.provider)),
1113 ("events".to_string(), serde_json::json!(trigger.events)),
1114 ]),
1115 });
1116 if let Some(provider) = trigger.provider.as_deref() {
1117 if let Some(connector) = bundle
1118 .connectors
1119 .iter()
1120 .find(|connector| connector.provider_id == provider || connector.id == provider)
1121 {
1122 edges.push(WorkflowBundleGraphEdge {
1123 from: connector_graph_id(&connector.id),
1124 to: trigger_graph_id(&trigger.id),
1125 label: Some("binds".to_string()),
1126 branch: None,
1127 });
1128 }
1129 }
1130 let target = trigger
1131 .node_id
1132 .clone()
1133 .unwrap_or_else(|| canonical.entry.clone());
1134 if catchup_enabled {
1135 edges.push(WorkflowBundleGraphEdge {
1136 from: trigger_graph_id(&trigger.id),
1137 to: catchup_graph_id(),
1138 label: Some(bundle.policy.catchup.mode.clone()),
1139 branch: Some("catchup".to_string()),
1140 });
1141 edges.push(WorkflowBundleGraphEdge {
1142 from: catchup_graph_id(),
1143 to: workflow_graph_id(&target),
1144 label: Some("dispatch".to_string()),
1145 branch: None,
1146 });
1147 } else {
1148 edges.push(WorkflowBundleGraphEdge {
1149 from: trigger_graph_id(&trigger.id),
1150 to: workflow_graph_id(&target),
1151 label: Some("dispatch".to_string()),
1152 branch: None,
1153 });
1154 }
1155 }
1156
1157 for (node_id, node) in &canonical.nodes {
1158 let capsule_id = capsules_by_node.get(node_id);
1159 let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1160 editable_fields.extend(node_fields.clone());
1161 nodes.push(WorkflowBundleGraphNode {
1162 id: workflow_graph_id(node_id),
1163 node_type: workflow_node_type(&node.kind),
1164 label: workflow_node_label(node_id, node),
1165 workflow_node_id: Some(node_id.clone()),
1166 trigger_id: None,
1167 connector_id: None,
1168 editable_fields: node_fields,
1169 metadata: BTreeMap::from([
1170 ("kind".to_string(), serde_json::json!(node.kind)),
1171 ("task_label".to_string(), serde_json::json!(node.task_label)),
1172 (
1173 "prompt_capsule".to_string(),
1174 serde_json::json!(capsule_id.cloned()),
1175 ),
1176 ]),
1177 });
1178 }
1179
1180 for edge in sorted_edges(&canonical) {
1181 edges.push(WorkflowBundleGraphEdge {
1182 from: workflow_graph_id(&edge.from),
1183 to: workflow_graph_id(&edge.to),
1184 label: edge.label.clone(),
1185 branch: edge.branch.clone(),
1186 });
1187 }
1188
1189 let outgoing: BTreeSet<&str> = canonical
1190 .edges
1191 .iter()
1192 .map(|edge| edge.from.as_str())
1193 .collect();
1194 for node_id in canonical.nodes.keys() {
1195 if !outgoing.contains(node_id.as_str()) {
1196 edges.push(WorkflowBundleGraphEdge {
1197 from: workflow_graph_id(node_id),
1198 to: terminal_completed_graph_id(),
1199 label: Some("completed".to_string()),
1200 branch: Some("completed".to_string()),
1201 });
1202 }
1203 if retry_can_dlq {
1204 edges.push(WorkflowBundleGraphEdge {
1205 from: workflow_graph_id(node_id),
1206 to: dlq_graph_id(),
1207 label: Some("retry exhausted".to_string()),
1208 branch: Some("failed".to_string()),
1209 });
1210 }
1211 }
1212
1213 nodes.push(WorkflowBundleGraphNode {
1214 id: terminal_completed_graph_id(),
1215 node_type: "terminal".to_string(),
1216 label: "Completed".to_string(),
1217 workflow_node_id: None,
1218 trigger_id: None,
1219 connector_id: None,
1220 editable_fields: Vec::new(),
1221 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1222 });
1223 nodes.push(WorkflowBundleGraphNode {
1224 id: terminal_failed_graph_id(),
1225 node_type: "terminal".to_string(),
1226 label: "Failed".to_string(),
1227 workflow_node_id: None,
1228 trigger_id: None,
1229 connector_id: None,
1230 editable_fields: Vec::new(),
1231 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1232 });
1233 if retry_can_dlq {
1234 edges.push(WorkflowBundleGraphEdge {
1235 from: dlq_graph_id(),
1236 to: terminal_failed_graph_id(),
1237 label: Some("failed".to_string()),
1238 branch: Some("failed".to_string()),
1239 });
1240 }
1241
1242 nodes.sort_by(|left, right| left.id.cmp(&right.id));
1243 edges.sort_by(|left, right| {
1244 (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1245 &right.from,
1246 &right.to,
1247 &right.branch,
1248 &right.label,
1249 ))
1250 });
1251 editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1252
1253 let diagnostics = validation
1254 .errors
1255 .iter()
1256 .chain(validation.warnings.iter())
1257 .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1258 severity: diagnostic.severity.clone(),
1259 path: diagnostic.path.clone(),
1260 message: diagnostic.message.clone(),
1261 node_id: diagnostic.node_id.clone(),
1262 graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1263 })
1264 .collect::<Vec<_>>();
1265 let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1266
1267 WorkflowBundleGraphExport {
1268 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1269 graph_id: canonical.id,
1270 graph_digest: validation.graph_digest.clone(),
1271 nodes,
1272 edges,
1273 diagnostics,
1274 editable_fields,
1275 mermaid,
1276 }
1277}
1278
1279pub fn run_workflow_bundle(
1280 bundle: &WorkflowBundle,
1281 request: WorkflowBundleRunRequest,
1282) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1283 let validation = validate_workflow_bundle(bundle);
1284 if !validation.valid {
1285 return Err(validation);
1286 }
1287
1288 let canonical = canonical_workflow_graph(&bundle.workflow);
1289 let trigger_id = match request.trigger_id {
1290 Some(trigger_id)
1291 if !bundle
1292 .triggers
1293 .iter()
1294 .any(|trigger| trigger.id == trigger_id) =>
1295 {
1296 let mut report = validation;
1297 push_error(
1298 &mut report,
1299 "trigger_id",
1300 format!("unknown trigger id: {trigger_id}"),
1301 None,
1302 );
1303 report.valid = false;
1304 return Err(report);
1305 }
1306 Some(trigger_id) => Some(trigger_id),
1307 None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1308 };
1309 let mut event_ids = bundle.receipts.event_ids.clone();
1310 if let Some(event_id) = request.event_id {
1311 if !event_ids.contains(&event_id) {
1312 event_ids.push(event_id);
1313 }
1314 }
1315 let run_id = bundle
1316 .receipts
1317 .run_id
1318 .clone()
1319 .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1320 let capsules_by_node = capsules_by_node(bundle);
1321 let executed_nodes = execution_order(&canonical)
1322 .into_iter()
1323 .map(|node_id| {
1324 let node = canonical
1325 .nodes
1326 .get(&node_id)
1327 .expect("execution order only contains known nodes");
1328 WorkflowBundleRunNodeReceipt {
1329 node_id: node_id.clone(),
1330 kind: node.kind.clone(),
1331 prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1332 status: "completed".to_string(),
1333 }
1334 })
1335 .collect();
1336
1337 Ok(WorkflowBundleRunReceipt {
1338 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1339 receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1340 bundle_id: bundle.id.clone(),
1341 bundle_version: bundle.version.clone(),
1342 workflow_id: canonical.id,
1343 workflow_version: canonical.version,
1344 graph_digest: validation.graph_digest,
1345 run_id,
1346 trigger_id,
1347 event_ids,
1348 status: "completed".to_string(),
1349 executed_nodes,
1350 policy: bundle.policy.clone(),
1351 connectors: bundle.connectors.clone(),
1352 environment: bundle.environment.clone(),
1353 })
1354}
1355
1356fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1357 let mut canonical = graph.clone();
1358 if canonical.type_name.is_empty() {
1359 canonical.type_name = "workflow_graph".to_string();
1360 }
1361 if canonical.version == 0 {
1362 canonical.version = 1;
1363 }
1364 if canonical.entry.is_empty() {
1365 canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1366 }
1367 for (node_id, node) in &mut canonical.nodes {
1368 if node.id.is_none() {
1369 node.id = Some(node_id.clone());
1370 }
1371 if node.kind.is_empty() {
1372 node.kind = "stage".to_string();
1373 }
1374 if node.retry_policy.max_attempts == 0 {
1375 node.retry_policy.max_attempts = 1;
1376 }
1377 }
1378 canonical.edges = sorted_edges(&canonical);
1379 canonical
1380}
1381
1382fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1383 let mut edges = graph.edges.clone();
1384 edges.sort_by(|left, right| {
1385 (
1386 &left.from,
1387 &left.to,
1388 left.branch.as_deref(),
1389 left.label.as_deref(),
1390 )
1391 .cmp(&(
1392 &right.from,
1393 &right.to,
1394 right.branch.as_deref(),
1395 right.label.as_deref(),
1396 ))
1397 });
1398 edges
1399}
1400
1401fn validate_bundle_identity(
1402 bundle: &WorkflowBundle,
1403 graph: &WorkflowGraph,
1404 report: &mut WorkflowBundleValidationReport,
1405) {
1406 if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1407 push_error(
1408 report,
1409 "schema_version",
1410 format!(
1411 "unsupported schema_version {}; expected {}",
1412 bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1413 ),
1414 None,
1415 );
1416 }
1417 if bundle.id.trim().is_empty() {
1418 push_error(report, "id", "bundle id is required", None);
1419 }
1420 if bundle.version.trim().is_empty() {
1421 push_error(report, "version", "bundle version is required", None);
1422 }
1423 if graph.id.trim().is_empty() {
1424 push_error(
1425 report,
1426 "workflow.id",
1427 "workflow id is required for portable bundles",
1428 None,
1429 );
1430 }
1431 if graph.nodes.is_empty() {
1432 push_error(
1433 report,
1434 "workflow.nodes",
1435 "workflow must contain nodes",
1436 None,
1437 );
1438 }
1439 for (node_id, node) in &graph.nodes {
1440 if node_id.trim().is_empty() {
1441 push_error(report, "workflow.nodes", "node id is required", None);
1442 }
1443 if node.id.as_deref().is_some_and(|id| id != node_id) {
1444 push_error(
1445 report,
1446 format!("workflow.nodes.{node_id}.id"),
1447 "node id field must match its map key",
1448 Some(node_id.clone()),
1449 );
1450 }
1451 }
1452}
1453
1454fn validate_manifest_contract(
1455 bundle: &WorkflowBundle,
1456 report: &mut WorkflowBundleValidationReport,
1457) {
1458 validate_relative_path(
1459 report,
1460 "entrypoint",
1461 &bundle.entrypoint,
1462 "entrypoint is required",
1463 );
1464 if bundle.transitive_modules.is_empty() {
1465 push_error(
1466 report,
1467 "transitive_modules",
1468 "at least one transitive module entry is required",
1469 None,
1470 );
1471 }
1472 let mut module_paths = BTreeSet::new();
1473 for (index, module) in bundle.transitive_modules.iter().enumerate() {
1474 let path = format!("transitive_modules[{index}]");
1475 validate_relative_path(
1476 report,
1477 format!("{path}.path"),
1478 &module.path,
1479 "module path is required",
1480 );
1481 if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1482 {
1483 push_error(
1484 report,
1485 format!("{path}.path"),
1486 format!(
1487 "duplicate transitive module path: {}",
1488 module.path.display()
1489 ),
1490 None,
1491 );
1492 }
1493 validate_blake3_hash(
1494 report,
1495 format!("{path}.source_hash_blake3"),
1496 &module.source_hash_blake3,
1497 true,
1498 );
1499 validate_blake3_hash(
1500 report,
1501 format!("{path}.harnbc_hash_blake3"),
1502 &module.harnbc_hash_blake3,
1503 true,
1504 );
1505 }
1506 if bundle.stdlib_version.trim().is_empty() {
1507 push_error(report, "stdlib_version", "stdlib_version is required", None);
1508 }
1509 if bundle.harn_version.trim().is_empty() {
1510 push_error(report, "harn_version", "harn_version is required", None);
1511 }
1512 validate_blake3_hash(
1513 report,
1514 "provider_catalog_hash",
1515 &bundle.provider_catalog_hash,
1516 true,
1517 );
1518
1519 let mut tool_names = BTreeSet::new();
1520 for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1521 let path = format!("tool_manifest[{index}]");
1522 if tool.name.trim().is_empty() {
1523 push_error(
1524 report,
1525 format!("{path}.name"),
1526 "tool manifest entry name is required",
1527 None,
1528 );
1529 } else if !tool_names.insert(tool.name.clone()) {
1530 push_error(
1531 report,
1532 format!("{path}.name"),
1533 format!("duplicate tool manifest entry: {}", tool.name),
1534 None,
1535 );
1536 }
1537 if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1538 validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1539 }
1540 }
1541
1542 if bundle.sbom.format.trim().is_empty() {
1543 push_error(report, "sbom.format", "SBOM format is required", None);
1544 }
1545 if bundle.sbom.version.trim().is_empty() {
1546 push_error(report, "sbom.version", "SBOM version is required", None);
1547 }
1548 let mut sbom_packages = BTreeSet::new();
1549 for (index, package) in bundle.sbom.packages.iter().enumerate() {
1550 let path = format!("sbom.packages[{index}]");
1551 if package.name.trim().is_empty() {
1552 push_error(
1553 report,
1554 format!("{path}.name"),
1555 "SBOM package name is required",
1556 None,
1557 );
1558 } else if !sbom_packages.insert(package.name.clone()) {
1559 push_error(
1560 report,
1561 format!("{path}.name"),
1562 format!("duplicate SBOM package: {}", package.name),
1563 None,
1564 );
1565 }
1566 if let Some(hash) = package.package_hash_blake3.as_deref() {
1567 validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1568 }
1569 }
1570 for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1571 let path = format!("sbom.relationships[{index}]");
1572 if relationship.from.trim().is_empty() {
1573 push_error(
1574 report,
1575 format!("{path}.from"),
1576 "SBOM relationship source is required",
1577 None,
1578 );
1579 }
1580 if relationship.to.trim().is_empty() {
1581 push_error(
1582 report,
1583 format!("{path}.to"),
1584 "SBOM relationship target is required",
1585 None,
1586 );
1587 }
1588 if relationship.relationship_type.trim().is_empty() {
1589 push_error(
1590 report,
1591 format!("{path}.relationship_type"),
1592 "SBOM relationship type is required",
1593 None,
1594 );
1595 }
1596 }
1597
1598 if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1599 if parent_id.trim().is_empty() {
1600 push_error(
1601 report,
1602 "parent_trust_record_id",
1603 "parent_trust_record_id cannot be empty when present",
1604 None,
1605 );
1606 }
1607 }
1608 if let Some(signature) = &bundle.signature {
1609 if signature.algorithm.trim().is_empty() {
1610 push_error(
1611 report,
1612 "signature.algorithm",
1613 "signature algorithm is required",
1614 None,
1615 );
1616 } else if signature.algorithm != "ed25519" {
1617 push_error(
1618 report,
1619 "signature.algorithm",
1620 "signature algorithm must be ed25519",
1621 None,
1622 );
1623 }
1624 if signature.public_key.trim().is_empty() {
1625 push_error(
1626 report,
1627 "signature.public_key",
1628 "signature public_key is required",
1629 None,
1630 );
1631 }
1632 if signature.signature.trim().is_empty() {
1633 push_error(
1634 report,
1635 "signature.signature",
1636 "signature value is required",
1637 None,
1638 );
1639 }
1640 validate_blake3_hash(
1641 report,
1642 "signature.manifest_hash_blake3",
1643 &signature.manifest_hash_blake3,
1644 true,
1645 );
1646 }
1647}
1648
1649fn validate_relative_path(
1650 report: &mut WorkflowBundleValidationReport,
1651 path: impl Into<String>,
1652 value: &Path,
1653 empty_message: &str,
1654) {
1655 let path = path.into();
1656 if value.as_os_str().is_empty() {
1657 push_error(report, path, empty_message, None);
1658 return;
1659 }
1660 if normalize_archive_path(value).is_err() {
1661 push_error(
1662 report,
1663 path,
1664 format!("path must be relative and contained: {}", value.display()),
1665 None,
1666 );
1667 }
1668}
1669
1670fn validate_blake3_hash(
1671 report: &mut WorkflowBundleValidationReport,
1672 path: impl Into<String>,
1673 value: &str,
1674 required: bool,
1675) {
1676 let path = path.into();
1677 if value.trim().is_empty() {
1678 if required {
1679 push_error(report, path, "BLAKE3 hash is required", None);
1680 }
1681 return;
1682 }
1683 let Some(hex) = value.strip_prefix("blake3:") else {
1684 push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1685 return;
1686 };
1687 if hex.len() != 64
1688 || !hex
1689 .bytes()
1690 .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1691 {
1692 push_error(
1693 report,
1694 path,
1695 "BLAKE3 hash must contain 64 lowercase hex digits",
1696 None,
1697 );
1698 }
1699}
1700
1701fn validate_triggers(
1702 bundle: &WorkflowBundle,
1703 graph: &WorkflowGraph,
1704 report: &mut WorkflowBundleValidationReport,
1705) {
1706 if bundle.triggers.is_empty() {
1707 push_warning(report, "triggers", "bundle declares no triggers", None);
1708 }
1709 let mut ids = BTreeSet::new();
1710 for (index, trigger) in bundle.triggers.iter().enumerate() {
1711 let path = format!("triggers[{index}]");
1712 if trigger.id.trim().is_empty() {
1713 push_error(report, format!("{path}.id"), "trigger id is required", None);
1714 } else if !ids.insert(trigger.id.clone()) {
1715 push_error(
1716 report,
1717 format!("{path}.id"),
1718 format!("duplicate trigger id: {}", trigger.id),
1719 None,
1720 );
1721 }
1722 match trigger.kind.as_str() {
1723 "github" => {
1724 if trigger.provider.as_deref() != Some("github") {
1725 push_error(
1726 report,
1727 format!("{path}.provider"),
1728 "github triggers require provider=\"github\"",
1729 None,
1730 );
1731 }
1732 if trigger.events.is_empty() {
1733 push_error(
1734 report,
1735 format!("{path}.events"),
1736 "github triggers require at least one event",
1737 None,
1738 );
1739 }
1740 }
1741 "cron" if trigger.schedule.is_none() => push_error(
1742 report,
1743 format!("{path}.schedule"),
1744 "cron triggers require schedule",
1745 None,
1746 ),
1747 "delay" if trigger.delay.is_none() => push_error(
1748 report,
1749 format!("{path}.delay"),
1750 "delay triggers require delay",
1751 None,
1752 ),
1753 "webhook" if trigger.webhook_path.is_none() => push_error(
1754 report,
1755 format!("{path}.webhook_path"),
1756 "webhook triggers require webhook_path",
1757 None,
1758 ),
1759 "mcp" if trigger.mcp_tool.is_none() => push_error(
1760 report,
1761 format!("{path}.mcp_tool"),
1762 "mcp triggers require mcp_tool",
1763 None,
1764 ),
1765 "manual" => {}
1766 "" => push_error(
1767 report,
1768 format!("{path}.kind"),
1769 "trigger kind is required",
1770 None,
1771 ),
1772 other
1773 if !matches!(
1774 other,
1775 "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1776 ) =>
1777 {
1778 push_error(
1779 report,
1780 format!("{path}.kind"),
1781 format!("unsupported trigger kind: {other}"),
1782 None,
1783 );
1784 }
1785 _ => {}
1786 }
1787 if let Some(node_id) = trigger.node_id.as_deref() {
1788 if !graph.nodes.contains_key(node_id) {
1789 push_error(
1790 report,
1791 format!("{path}.node_id"),
1792 format!("trigger references unknown node: {node_id}"),
1793 Some(node_id.to_string()),
1794 );
1795 }
1796 }
1797 }
1798}
1799
1800fn validate_prompt_capsules(
1801 bundle: &WorkflowBundle,
1802 graph: &WorkflowGraph,
1803 report: &mut WorkflowBundleValidationReport,
1804) {
1805 let trigger_ids: BTreeSet<&str> = bundle
1806 .triggers
1807 .iter()
1808 .map(|trigger| trigger.id.as_str())
1809 .collect();
1810 let mut node_refs = BTreeSet::new();
1811 for (key, capsule) in &bundle.prompt_capsules {
1812 let path = format!("prompt_capsules.{key}");
1813 if capsule.id.trim().is_empty() {
1814 push_error(
1815 report,
1816 format!("{path}.id"),
1817 "prompt capsule id is required",
1818 None,
1819 );
1820 } else if capsule.id != *key {
1821 push_error(
1822 report,
1823 format!("{path}.id"),
1824 "prompt capsule id must match its map key",
1825 None,
1826 );
1827 }
1828 if capsule.prompt.trim().is_empty() {
1829 push_error(
1830 report,
1831 format!("{path}.prompt"),
1832 "prompt capsule prompt is required",
1833 Some(capsule.node_id.clone()),
1834 );
1835 }
1836 if !graph.nodes.contains_key(&capsule.node_id) {
1837 push_error(
1838 report,
1839 format!("{path}.node_id"),
1840 format!(
1841 "prompt capsule references unknown node: {}",
1842 capsule.node_id
1843 ),
1844 Some(capsule.node_id.clone()),
1845 );
1846 }
1847 if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1848 push_error(
1849 report,
1850 format!("{path}.node_id"),
1851 format!("multiple prompt capsules target node {}", capsule.node_id),
1852 Some(capsule.node_id.clone()),
1853 );
1854 }
1855 if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1856 if !trigger_ids.contains(trigger_id) {
1857 push_error(
1858 report,
1859 format!("{path}.trigger_id"),
1860 format!("prompt capsule references unknown trigger: {trigger_id}"),
1861 Some(capsule.node_id.clone()),
1862 );
1863 }
1864 }
1865 }
1866}
1867
1868fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1869 if !matches!(
1870 bundle.policy.autonomy_tier.as_str(),
1871 "shadow" | "suggest" | "act_with_approval" | "act_auto"
1872 ) {
1873 push_error(
1874 report,
1875 "policy.autonomy_tier",
1876 "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1877 None,
1878 );
1879 }
1880 if bundle.policy.retry.max_attempts == 0 {
1881 push_error(
1882 report,
1883 "policy.retry.max_attempts",
1884 "retry.max_attempts must be at least 1",
1885 None,
1886 );
1887 }
1888 if !matches!(
1889 bundle.policy.catchup.mode.as_str(),
1890 "none" | "latest" | "all"
1891 ) {
1892 push_error(
1893 report,
1894 "policy.catchup.mode",
1895 "catchup.mode must be none, latest, or all",
1896 None,
1897 );
1898 }
1899}
1900
1901fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1902 let mut ids = BTreeSet::new();
1903 let provider_ids: BTreeSet<&str> = bundle
1904 .connectors
1905 .iter()
1906 .map(|connector| connector.provider_id.as_str())
1907 .collect();
1908 for (index, connector) in bundle.connectors.iter().enumerate() {
1909 let path = format!("connectors[{index}]");
1910 if connector.id.trim().is_empty() {
1911 push_error(
1912 report,
1913 format!("{path}.id"),
1914 "connector id is required",
1915 None,
1916 );
1917 } else if !ids.insert(connector.id.clone()) {
1918 push_error(
1919 report,
1920 format!("{path}.id"),
1921 format!("duplicate connector id: {}", connector.id),
1922 None,
1923 );
1924 }
1925 if connector.provider_id.trim().is_empty() {
1926 push_error(
1927 report,
1928 format!("{path}.provider_id"),
1929 "connector provider_id is required",
1930 None,
1931 );
1932 }
1933 }
1934 for trigger in &bundle.triggers {
1935 if let Some(provider) = trigger.provider.as_deref() {
1936 if !provider_ids.contains(provider) {
1937 push_warning(
1938 report,
1939 "connectors",
1940 format!(
1941 "trigger {} references provider {provider} with no connector requirement",
1942 trigger.id
1943 ),
1944 trigger.node_id.clone(),
1945 );
1946 }
1947 }
1948 }
1949}
1950
1951fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1952 if !matches!(
1953 bundle.environment.worktree_policy.as_str(),
1954 "reuse_current" | "new_worktree" | "host_managed"
1955 ) {
1956 push_error(
1957 report,
1958 "environment.worktree_policy",
1959 "worktree_policy must be reuse_current, new_worktree, or host_managed",
1960 None,
1961 );
1962 }
1963}
1964
1965fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1966 for prefix in [
1967 "node is unreachable: ",
1968 "edge.from references unknown node: ",
1969 "edge.to references unknown node: ",
1970 "entry node does not exist: ",
1971 ] {
1972 if let Some(node_id) = message.strip_prefix(prefix) {
1973 return Some(node_id.to_string());
1974 }
1975 }
1976 if let Some(rest) = message.strip_prefix("node ") {
1977 if let Some((node_id, _)) = rest.split_once(':') {
1978 return Some(node_id.to_string());
1979 }
1980 }
1981 graph
1982 .nodes
1983 .keys()
1984 .find(|node_id| message.contains(&format!("node {node_id}:")))
1985 .cloned()
1986}
1987
1988fn workflow_graph_id(node_id: &str) -> String {
1989 format!("node/{node_id}")
1990}
1991
1992fn trigger_graph_id(trigger_id: &str) -> String {
1993 format!("trigger/{trigger_id}")
1994}
1995
1996fn connector_graph_id(connector_id: &str) -> String {
1997 format!("connector/{connector_id}")
1998}
1999
2000fn catchup_graph_id() -> String {
2001 "policy/catchup".to_string()
2002}
2003
2004fn dlq_graph_id() -> String {
2005 "policy/dlq".to_string()
2006}
2007
2008fn terminal_completed_graph_id() -> String {
2009 "terminal/completed".to_string()
2010}
2011
2012fn terminal_failed_graph_id() -> String {
2013 "terminal/failed".to_string()
2014}
2015
2016fn workflow_node_type(kind: &str) -> String {
2017 match kind {
2018 "action" => "action",
2019 "stage" | "agent" => "agent",
2020 "subagent" | "worker" => "subagent",
2021 "wait" | "waitpoint" | "delay" => "wait",
2022 "approval" | "hitl" => "approval",
2023 "connector" | "connector_call" => "connector_call",
2024 "notification" | "notify" => "notification",
2025 "terminal" | "success" | "failure" => "terminal",
2026 other if other.trim().is_empty() => "agent",
2027 other => other,
2028 }
2029 .to_string()
2030}
2031
2032fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2033 node.task_label
2034 .clone()
2035 .or_else(|| node.prompt.clone())
2036 .map(|label| label.trim().to_string())
2037 .filter(|label| !label.is_empty())
2038 .unwrap_or_else(|| node_id.to_string())
2039}
2040
2041fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2042 if !trigger.events.is_empty() {
2043 format!("{}: {}", trigger.kind, trigger.events.join(", "))
2044 } else if let Some(schedule) = trigger.schedule.as_deref() {
2045 format!("cron: {schedule}")
2046 } else if let Some(delay) = trigger.delay.as_deref() {
2047 format!("delay: {delay}")
2048 } else {
2049 trigger.id.clone()
2050 }
2051}
2052
2053fn connector_label(connector: &ConnectorRequirement) -> String {
2054 if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2055 connector.id.clone()
2056 } else {
2057 format!("{} ({})", connector.id, connector.provider_id)
2058 }
2059}
2060
2061fn editable_field(
2062 id: impl Into<String>,
2063 label: impl Into<String>,
2064 json_pointer: impl Into<String>,
2065 value_type: impl Into<String>,
2066 required: bool,
2067 enum_values: &[&str],
2068) -> WorkflowBundleEditableField {
2069 WorkflowBundleEditableField {
2070 id: id.into(),
2071 label: label.into(),
2072 json_pointer: json_pointer.into(),
2073 value_type: value_type.into(),
2074 required,
2075 enum_values: enum_values
2076 .iter()
2077 .map(|value| (*value).to_string())
2078 .collect(),
2079 }
2080}
2081
2082fn json_pointer_segment(value: &str) -> String {
2083 value.replace('~', "~0").replace('/', "~1")
2084}
2085
2086fn trigger_editable_fields(
2087 index: usize,
2088 trigger: &WorkflowBundleTrigger,
2089) -> Vec<WorkflowBundleEditableField> {
2090 let base = format!("/triggers/{index}");
2091 let mut fields = vec![
2092 editable_field(
2093 format!("trigger.{}.kind", trigger.id),
2094 "Trigger kind",
2095 format!("{base}/kind"),
2096 "enum",
2097 true,
2098 &["github", "cron", "delay", "manual", "webhook", "mcp"],
2099 ),
2100 editable_field(
2101 format!("trigger.{}.node_id", trigger.id),
2102 "Target node",
2103 format!("{base}/node_id"),
2104 "string",
2105 false,
2106 &[],
2107 ),
2108 ];
2109 if trigger.provider.is_some() || trigger.kind == "github" {
2110 fields.push(editable_field(
2111 format!("trigger.{}.provider", trigger.id),
2112 "Provider",
2113 format!("{base}/provider"),
2114 "string",
2115 trigger.kind == "github",
2116 &[],
2117 ));
2118 }
2119 for (field, label, value_type) in [
2120 ("events", "Events", "list"),
2121 ("schedule", "Schedule", "string"),
2122 ("delay", "Delay", "string"),
2123 ("webhook_path", "Webhook path", "string"),
2124 ("mcp_tool", "MCP tool", "string"),
2125 ("resume_key", "Resume key", "string"),
2126 ("metadata", "Metadata", "object"),
2127 ] {
2128 fields.push(editable_field(
2129 format!("trigger.{}.{}", trigger.id, field),
2130 label,
2131 format!("{base}/{field}"),
2132 value_type,
2133 false,
2134 &[],
2135 ));
2136 }
2137 fields
2138}
2139
2140fn workflow_node_editable_fields(
2141 node_id: &str,
2142 capsule_id: Option<&String>,
2143) -> Vec<WorkflowBundleEditableField> {
2144 let escaped_node = json_pointer_segment(node_id);
2145 let mut fields = vec![
2146 editable_field(
2147 format!("workflow.{node_id}.task_label"),
2148 "Task label",
2149 format!("/workflow/nodes/{escaped_node}/task_label"),
2150 "string",
2151 false,
2152 &[],
2153 ),
2154 editable_field(
2155 format!("workflow.{node_id}.prompt"),
2156 "Prompt",
2157 format!("/workflow/nodes/{escaped_node}/prompt"),
2158 "string",
2159 false,
2160 &[],
2161 ),
2162 editable_field(
2163 format!("workflow.{node_id}.system"),
2164 "System prompt",
2165 format!("/workflow/nodes/{escaped_node}/system"),
2166 "string",
2167 false,
2168 &[],
2169 ),
2170 editable_field(
2171 format!("workflow.{node_id}.model_policy"),
2172 "Model policy",
2173 format!("/workflow/nodes/{escaped_node}/model_policy"),
2174 "object",
2175 false,
2176 &[],
2177 ),
2178 editable_field(
2179 format!("workflow.{node_id}.tools"),
2180 "Tool policy",
2181 format!("/workflow/nodes/{escaped_node}/tools"),
2182 "any",
2183 false,
2184 &[],
2185 ),
2186 editable_field(
2187 format!("workflow.{node_id}.capability_policy"),
2188 "Capability policy",
2189 format!("/workflow/nodes/{escaped_node}/capability_policy"),
2190 "object",
2191 false,
2192 &[],
2193 ),
2194 editable_field(
2195 format!("workflow.{node_id}.approval_policy"),
2196 "Approval policy",
2197 format!("/workflow/nodes/{escaped_node}/approval_policy"),
2198 "object",
2199 false,
2200 &[],
2201 ),
2202 editable_field(
2203 format!("workflow.{node_id}.retry_policy"),
2204 "Retry policy",
2205 format!("/workflow/nodes/{escaped_node}/retry_policy"),
2206 "object",
2207 false,
2208 &[],
2209 ),
2210 ];
2211 if let Some(capsule_id) = capsule_id {
2212 let escaped_capsule = json_pointer_segment(capsule_id);
2213 fields.extend([
2214 editable_field(
2215 format!("prompt_capsule.{capsule_id}.prompt"),
2216 "Prompt capsule",
2217 format!("/prompt_capsules/{escaped_capsule}/prompt"),
2218 "string",
2219 true,
2220 &[],
2221 ),
2222 editable_field(
2223 format!("prompt_capsule.{capsule_id}.system"),
2224 "Prompt capsule system",
2225 format!("/prompt_capsules/{escaped_capsule}/system"),
2226 "string",
2227 false,
2228 &[],
2229 ),
2230 editable_field(
2231 format!("prompt_capsule.{capsule_id}.context"),
2232 "Prompt capsule context",
2233 format!("/prompt_capsules/{escaped_capsule}/context"),
2234 "object",
2235 false,
2236 &[],
2237 ),
2238 editable_field(
2239 format!("prompt_capsule.{capsule_id}.trigger_id"),
2240 "Prompt capsule trigger",
2241 format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2242 "string",
2243 false,
2244 &[],
2245 ),
2246 ]);
2247 }
2248 fields
2249}
2250
2251fn connector_editable_fields(
2252 index: usize,
2253 connector: &ConnectorRequirement,
2254) -> Vec<WorkflowBundleEditableField> {
2255 let base = format!("/connectors/{index}");
2256 [
2257 ("id", "Connector id", "string", true),
2258 ("provider_id", "Provider id", "string", true),
2259 ("scopes", "Scopes", "list", false),
2260 ("setup_required", "Setup required", "bool", false),
2261 ("status_required", "Status required", "bool", false),
2262 ]
2263 .into_iter()
2264 .map(|(field, label, value_type, required)| {
2265 editable_field(
2266 format!("connector.{}.{}", connector.id, field),
2267 label,
2268 format!("{base}/{field}"),
2269 value_type,
2270 required,
2271 &[],
2272 )
2273 })
2274 .collect()
2275}
2276
2277fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2278 vec![
2279 editable_field(
2280 "policy.retry.max_attempts",
2281 "Retry attempts",
2282 "/policy/retry/max_attempts",
2283 "integer",
2284 true,
2285 &[],
2286 ),
2287 editable_field(
2288 "policy.retry.backoff",
2289 "Retry backoff",
2290 "/policy/retry/backoff",
2291 "string",
2292 true,
2293 &[],
2294 ),
2295 ]
2296}
2297
2298fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2299 vec![
2300 editable_field(
2301 "policy.catchup.mode",
2302 "Catchup mode",
2303 "/policy/catchup/mode",
2304 "enum",
2305 true,
2306 &["none", "latest", "all"],
2307 ),
2308 editable_field(
2309 "policy.catchup.max_events",
2310 "Catchup max events",
2311 "/policy/catchup/max_events",
2312 "integer",
2313 false,
2314 &[],
2315 ),
2316 ]
2317}
2318
2319fn render_workflow_bundle_mermaid(
2320 nodes: &[WorkflowBundleGraphNode],
2321 edges: &[WorkflowBundleGraphEdge],
2322) -> String {
2323 let mut lines = vec!["flowchart TD".to_string()];
2324 for node in nodes {
2325 lines.push(format!(
2326 " {}[\"{}\"]",
2327 mermaid_id(&node.id),
2328 mermaid_label(&format!("{}: {}", node.node_type, node.label))
2329 ));
2330 }
2331 for edge in edges {
2332 let label = edge
2333 .label
2334 .as_deref()
2335 .or(edge.branch.as_deref())
2336 .map(mermaid_label);
2337 match label {
2338 Some(label) if !label.is_empty() => lines.push(format!(
2339 " {} -->|{}| {}",
2340 mermaid_id(&edge.from),
2341 label,
2342 mermaid_id(&edge.to)
2343 )),
2344 _ => lines.push(format!(
2345 " {} --> {}",
2346 mermaid_id(&edge.from),
2347 mermaid_id(&edge.to)
2348 )),
2349 }
2350 }
2351 lines.join("\n")
2352}
2353
2354fn mermaid_id(value: &str) -> String {
2355 let digest = Sha256::digest(value.as_bytes());
2356 let suffix = digest
2357 .iter()
2358 .take(4)
2359 .map(|byte| format!("{byte:02x}"))
2360 .collect::<String>();
2361 let mut out = format!("n_{suffix}_");
2362 for ch in value.chars() {
2363 if ch.is_ascii_alphanumeric() {
2364 out.push(ch);
2365 } else {
2366 out.push('_');
2367 }
2368 }
2369 out
2370}
2371
2372fn mermaid_label(value: &str) -> String {
2373 value
2374 .replace('\\', "\\\\")
2375 .replace('"', "\\\"")
2376 .replace('\n', " ")
2377}
2378
2379fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2380 let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2381 for trigger in &bundle.triggers {
2382 if let Some(node_id) = trigger.node_id.as_ref() {
2383 by_node
2384 .entry(node_id.clone())
2385 .or_default()
2386 .push(trigger.id.clone());
2387 }
2388 }
2389 by_node
2390}
2391
2392fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2393 bundle
2394 .prompt_capsules
2395 .iter()
2396 .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2397 .collect()
2398}
2399
2400fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2401 let outgoing =
2402 graph
2403 .edges
2404 .iter()
2405 .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2406 acc.entry(edge.from.clone())
2407 .or_default()
2408 .push(edge.to.clone());
2409 acc
2410 });
2411 let mut seen = BTreeSet::new();
2412 let mut queue = VecDeque::from([graph.entry.clone()]);
2413 let mut order = Vec::new();
2414 while let Some(node_id) = queue.pop_front() {
2415 if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2416 continue;
2417 }
2418 order.push(node_id.clone());
2419 if let Some(next) = outgoing.get(&node_id) {
2420 let mut next = next.clone();
2421 next.sort();
2422 for child in next {
2423 queue.push_back(child);
2424 }
2425 }
2426 }
2427 order
2428}
2429
2430fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2431 let suffix = graph_digest
2432 .strip_prefix("sha256:")
2433 .unwrap_or(graph_digest)
2434 .chars()
2435 .take(12)
2436 .collect::<String>();
2437 format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2438}
2439
2440fn sanitize_id(value: &str) -> String {
2441 value
2442 .chars()
2443 .map(|ch| {
2444 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2445 ch
2446 } else {
2447 '_'
2448 }
2449 })
2450 .collect()
2451}
2452
2453fn push_error(
2454 report: &mut WorkflowBundleValidationReport,
2455 path: impl Into<String>,
2456 message: impl Into<String>,
2457 node_id: Option<String>,
2458) {
2459 report.errors.push(WorkflowBundleDiagnostic {
2460 severity: "error".to_string(),
2461 path: path.into(),
2462 message: message.into(),
2463 node_id,
2464 });
2465}
2466
2467fn push_warning(
2468 report: &mut WorkflowBundleValidationReport,
2469 path: impl Into<String>,
2470 message: impl Into<String>,
2471 node_id: Option<String>,
2472) {
2473 report.warnings.push(WorkflowBundleDiagnostic {
2474 severity: "warning".to_string(),
2475 path: path.into(),
2476 message: message.into(),
2477 node_id,
2478 });
2479}
2480
2481#[cfg(test)]
2482mod tests;