1use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22const MAX_HARNPACK_DECOMPRESSED_BYTES: u64 = 100 * 1024 * 1024;
28
29fn decompress_harnpack_zstd(bytes: &[u8]) -> Result<Vec<u8>, WorkflowBundleError> {
34 let mut decoder = zstd::stream::Decoder::new(Cursor::new(bytes))?;
35 let mut output = Vec::new();
36 let mut limited = (&mut decoder).take(MAX_HARNPACK_DECOMPRESSED_BYTES.saturating_add(1));
37 limited.read_to_end(&mut output)?;
38 if output.len() as u64 > MAX_HARNPACK_DECOMPRESSED_BYTES {
39 return Err(WorkflowBundleError::new(
40 WorkflowBundleErrorKind::InvalidArchive,
41 format!(
42 "harnpack zstd payload decompressed past max size ({MAX_HARNPACK_DECOMPRESSED_BYTES} bytes); refusing to extract"
43 ),
44 ));
45 }
46 Ok(output)
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct WorkflowBundle {
52 pub schema_version: u32,
53 pub entrypoint: PathBuf,
54 pub transitive_modules: Vec<ModuleEntry>,
55 pub stdlib_version: String,
56 pub harn_version: String,
57 pub provider_catalog_hash: String,
58 pub tool_manifest: Vec<ToolEntry>,
59 pub sbom: SBOMDoc,
60 pub signature: Option<Ed25519Signature>,
61 pub parent_trust_record_id: Option<String>,
62 pub id: String,
63 pub name: Option<String>,
64 pub version: String,
65 pub triggers: Vec<WorkflowBundleTrigger>,
66 pub workflow: WorkflowGraph,
67 pub prompt_capsules: BTreeMap<String, PromptCapsule>,
68 pub policy: WorkflowBundlePolicy,
69 pub connectors: Vec<ConnectorRequirement>,
70 pub environment: EnvironmentRequirements,
71 pub receipts: WorkflowBundleReplayMetadata,
72 pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75impl Default for WorkflowBundle {
76 fn default() -> Self {
77 Self {
78 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
79 entrypoint: PathBuf::new(),
80 transitive_modules: Vec::new(),
81 stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
82 harn_version: env!("CARGO_PKG_VERSION").to_string(),
83 provider_catalog_hash: String::new(),
84 tool_manifest: Vec::new(),
85 sbom: SBOMDoc::default(),
86 signature: None,
87 parent_trust_record_id: None,
88 id: String::new(),
89 name: None,
90 version: String::new(),
91 triggers: Vec::new(),
92 workflow: WorkflowGraph::default(),
93 prompt_capsules: BTreeMap::new(),
94 policy: WorkflowBundlePolicy::default(),
95 connectors: Vec::new(),
96 environment: EnvironmentRequirements::default(),
97 receipts: WorkflowBundleReplayMetadata::default(),
98 metadata: BTreeMap::new(),
99 }
100 }
101}
102
103#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(default)]
105pub struct ModuleEntry {
106 pub path: PathBuf,
107 pub source_hash_blake3: String,
108 pub harnbc_hash_blake3: String,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ToolEntry {
114 pub name: String,
115 pub provider: Option<String>,
116 pub annotations: Option<ToolAnnotations>,
117 pub schema_hash_blake3: Option<String>,
118 pub metadata: BTreeMap<String, String>,
119}
120
121#[allow(clippy::upper_case_acronyms)]
122#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
123#[serde(default)]
124pub struct SBOMDoc {
125 pub format: String,
126 pub version: String,
127 pub packages: Vec<SBOMPackage>,
128 pub relationships: Vec<SBOMRelationship>,
129}
130
131#[allow(clippy::upper_case_acronyms)]
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct SBOMPackage {
135 pub name: String,
136 pub version: Option<String>,
137 pub package_hash_blake3: Option<String>,
138 pub license: Option<String>,
139}
140
141#[allow(clippy::upper_case_acronyms)]
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct SBOMRelationship {
145 pub from: String,
146 pub to: String,
147 pub relationship_type: String,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct Ed25519Signature {
153 pub key_id: Option<String>,
154 pub public_key: String,
155 pub signature: String,
156 pub manifest_hash_blake3: String,
157 pub algorithm: String,
158}
159
160#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
161#[serde(default)]
162pub struct WorkflowBundleTrigger {
163 pub id: String,
164 pub kind: String,
165 pub provider: Option<String>,
166 pub events: Vec<String>,
167 pub schedule: Option<String>,
168 pub delay: Option<String>,
169 pub webhook_path: Option<String>,
170 pub mcp_tool: Option<String>,
171 pub resume_key: Option<String>,
172 pub node_id: Option<String>,
173 pub metadata: BTreeMap<String, String>,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct PromptCapsule {
179 pub id: String,
180 pub node_id: String,
181 pub trigger_id: Option<String>,
182 pub prompt: String,
183 pub system: Option<String>,
184 pub context: BTreeMap<String, serde_json::Value>,
185}
186
187#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
188#[serde(default)]
189pub struct WorkflowBundlePolicy {
190 pub autonomy_tier: String,
191 pub tool_policy: BTreeMap<String, serde_json::Value>,
192 pub approval_required: Vec<String>,
193 pub retry: RetryPolicySpec,
194 pub catchup: CatchupPolicySpec,
195}
196
197impl Default for WorkflowBundlePolicy {
198 fn default() -> Self {
199 Self {
200 autonomy_tier: "act_with_approval".to_string(),
201 tool_policy: BTreeMap::new(),
202 approval_required: Vec::new(),
203 retry: RetryPolicySpec::default(),
204 catchup: CatchupPolicySpec::default(),
205 }
206 }
207}
208
209#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(default)]
211pub struct RetryPolicySpec {
212 pub max_attempts: u32,
213 pub backoff: String,
214}
215
216impl Default for RetryPolicySpec {
217 fn default() -> Self {
218 Self {
219 max_attempts: 1,
220 backoff: "none".to_string(),
221 }
222 }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226#[serde(default)]
227pub struct CatchupPolicySpec {
228 pub mode: String,
229 pub max_events: Option<u32>,
230}
231
232impl Default for CatchupPolicySpec {
233 fn default() -> Self {
234 Self {
235 mode: "latest".to_string(),
236 max_events: Some(1),
237 }
238 }
239}
240
241#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
242#[serde(default)]
243pub struct ConnectorRequirement {
244 pub id: String,
245 pub provider_id: String,
246 pub scopes: Vec<String>,
247 pub setup_required: bool,
248 pub status_required: bool,
249}
250
251#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct EnvironmentRequirements {
254 pub repo_setup_profile: Option<String>,
255 pub worktree_policy: String,
256 pub command_gates: Vec<String>,
257}
258
259impl Default for EnvironmentRequirements {
260 fn default() -> Self {
261 Self {
262 repo_setup_profile: None,
263 worktree_policy: "host_managed".to_string(),
264 command_gates: Vec::new(),
265 }
266 }
267}
268
269#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
270#[serde(default)]
271pub struct WorkflowBundleReplayMetadata {
272 pub run_id: Option<String>,
273 pub event_ids: Vec<String>,
274 pub workflow_version: Option<usize>,
275 pub graph_digest: Option<String>,
276}
277
278#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
279#[serde(default)]
280pub struct WorkflowBundleDiagnostic {
281 pub severity: String,
282 pub path: String,
283 pub message: String,
284 pub node_id: Option<String>,
285}
286
287#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
288#[serde(default)]
289pub struct WorkflowBundleValidationReport {
290 pub valid: bool,
291 pub bundle_id: String,
292 pub workflow_id: String,
293 pub graph_digest: String,
294 pub errors: Vec<WorkflowBundleDiagnostic>,
295 pub warnings: Vec<WorkflowBundleDiagnostic>,
296}
297
298#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
299pub struct WorkflowBundlePreview {
300 pub schema_version: u32,
301 pub bundle_id: String,
302 pub bundle_version: String,
303 pub workflow_id: String,
304 pub workflow_version: usize,
305 pub graph_digest: String,
306 pub validation: WorkflowBundleValidationReport,
307 pub graph: WorkflowBundleGraphExport,
308 pub mermaid: String,
309 pub triggers: Vec<WorkflowBundleTrigger>,
310 pub connectors: Vec<ConnectorRequirement>,
311 pub environment: EnvironmentRequirements,
312 pub nodes: Vec<WorkflowBundlePreviewNode>,
313 pub edges: Vec<WorkflowEdge>,
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
317pub struct WorkflowBundlePreviewNode {
318 pub id: String,
319 pub kind: String,
320 pub label: Option<String>,
321 pub prompt_capsule: Option<String>,
322 pub trigger_ids: Vec<String>,
323 pub outgoing: Vec<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
327pub struct WorkflowBundleGraphExport {
328 pub schema_version: u32,
329 pub graph_id: String,
330 pub graph_digest: String,
331 pub nodes: Vec<WorkflowBundleGraphNode>,
332 pub edges: Vec<WorkflowBundleGraphEdge>,
333 pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
334 pub editable_fields: Vec<WorkflowBundleEditableField>,
335 pub mermaid: String,
336}
337
338#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
339pub struct WorkflowBundleGraphNode {
340 pub id: String,
341 pub node_type: String,
342 pub label: String,
343 pub workflow_node_id: Option<String>,
344 pub trigger_id: Option<String>,
345 pub connector_id: Option<String>,
346 pub editable_fields: Vec<WorkflowBundleEditableField>,
347 pub metadata: BTreeMap<String, serde_json::Value>,
348}
349
350#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
351pub struct WorkflowBundleGraphEdge {
352 pub from: String,
353 pub to: String,
354 pub label: Option<String>,
355 pub branch: Option<String>,
356}
357
358#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
359pub struct WorkflowBundleGraphDiagnostic {
360 pub severity: String,
361 pub path: String,
362 pub message: String,
363 pub node_id: Option<String>,
364 pub graph_node_id: Option<String>,
365}
366
367#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
368pub struct WorkflowBundleEditableField {
369 pub id: String,
370 pub label: String,
371 pub json_pointer: String,
372 pub value_type: String,
373 pub required: bool,
374 pub enum_values: Vec<String>,
375}
376
377#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
378#[serde(default)]
379pub struct WorkflowBundleRunRequest {
380 pub trigger_id: Option<String>,
381 pub event_id: Option<String>,
382}
383
384#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
385pub struct WorkflowBundleRunReceipt {
386 pub schema_version: u32,
387 pub receipt_type: String,
388 pub bundle_id: String,
389 pub bundle_version: String,
390 pub workflow_id: String,
391 pub workflow_version: usize,
392 pub graph_digest: String,
393 pub run_id: String,
394 pub trigger_id: Option<String>,
395 pub event_ids: Vec<String>,
396 pub status: String,
397 pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
398 pub policy: WorkflowBundlePolicy,
399 pub connectors: Vec<ConnectorRequirement>,
400 pub environment: EnvironmentRequirements,
401}
402
403#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
404pub struct WorkflowBundleRunNodeReceipt {
405 pub node_id: String,
406 pub kind: String,
407 pub prompt_capsule: Option<String>,
408 pub status: String,
409}
410
411#[derive(Clone, Debug, PartialEq, Eq)]
412pub struct HarnpackEntry {
413 pub path: PathBuf,
414 pub bytes: Vec<u8>,
415 pub mode: u32,
416}
417
418impl HarnpackEntry {
419 pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
420 Self {
421 path: path.into(),
422 bytes: bytes.into(),
423 mode: DEFAULT_HARNPACK_FILE_MODE,
424 }
425 }
426
427 pub fn with_mode(mut self, mode: u32) -> Self {
428 self.mode = mode;
429 self
430 }
431}
432
433#[derive(Clone, Debug, PartialEq)]
434pub struct HarnpackArchive {
435 pub manifest: WorkflowBundle,
436 pub contents: Vec<HarnpackEntry>,
437}
438
439#[derive(Clone, Debug, PartialEq, Eq)]
440pub enum WorkflowBundleErrorKind {
441 Io,
442 Json,
443 MissingSchemaVersion,
444 UnsupportedSchemaVersion { actual: u32, expected: u32 },
445 InvalidArchive,
446 DuplicateArchiveEntry,
447 UnsafeArchivePath,
448 InvalidSignature,
449}
450
451#[derive(Clone, Debug, PartialEq, Eq)]
452pub struct WorkflowBundleError {
453 pub kind: WorkflowBundleErrorKind,
454 pub message: String,
455}
456
457impl WorkflowBundleError {
458 fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
459 Self {
460 kind,
461 message: message.into(),
462 }
463 }
464
465 fn unsupported_schema_version(actual: u32) -> Self {
466 Self::new(
467 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
468 actual,
469 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
470 },
471 format!(
472 "unsupported workflow bundle schema_version {actual}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
473 ),
474 )
475 }
476}
477
478impl std::fmt::Display for WorkflowBundleError {
479 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
480 self.message.fmt(f)
481 }
482}
483
484impl std::error::Error for WorkflowBundleError {}
485
486impl From<std::io::Error> for WorkflowBundleError {
487 fn from(error: std::io::Error) -> Self {
488 Self::new(WorkflowBundleErrorKind::Io, error.to_string())
489 }
490}
491
492impl From<serde_json::Error> for WorkflowBundleError {
493 fn from(error: serde_json::Error) -> Self {
494 Self::new(WorkflowBundleErrorKind::Json, error.to_string())
495 }
496}
497
498pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
499 let bytes = fs::read(path)?;
500 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
501 read_harnpack(&bytes).map(|archive| archive.manifest)
502 } else {
503 parse_workflow_bundle_manifest(&bytes)
504 }
505}
506
507pub fn read_workflow_bundle_manifest_any_version(
513 bytes: &[u8],
514) -> Result<WorkflowBundle, WorkflowBundleError> {
515 let value: serde_json::Value = serde_json::from_slice(bytes)?;
516 if value.get("schema_version").is_none() {
517 return Err(WorkflowBundleError::new(
518 WorkflowBundleErrorKind::MissingSchemaVersion,
519 "workflow bundle manifest is missing schema_version",
520 ));
521 }
522 serde_json::from_value(value).map_err(Into::into)
523}
524
525pub fn load_workflow_bundle_any_version(
529 path: &Path,
530) -> Result<WorkflowBundle, WorkflowBundleError> {
531 let bytes = fs::read(path)?;
532 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
533 let tar_bytes = decompress_harnpack_zstd(&bytes)?;
534 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
535 for entry in archive.entries()? {
536 let mut entry = entry?;
537 if entry.header().entry_type().is_dir() {
538 continue;
539 }
540 let path = normalize_archive_path(entry.path()?.as_ref())?;
541 if path == HARNPACK_MANIFEST_PATH {
542 let mut entry_bytes = Vec::new();
543 entry.read_to_end(&mut entry_bytes)?;
544 return read_workflow_bundle_manifest_any_version(&entry_bytes);
545 }
546 }
547 Err(WorkflowBundleError::new(
548 WorkflowBundleErrorKind::InvalidArchive,
549 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
550 ))
551 } else {
552 read_workflow_bundle_manifest_any_version(&bytes)
553 }
554}
555
556pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
557 let value: serde_json::Value = serde_json::from_slice(bytes)?;
558 let schema_version = value
559 .get("schema_version")
560 .and_then(serde_json::Value::as_u64)
561 .ok_or_else(|| {
562 WorkflowBundleError::new(
563 WorkflowBundleErrorKind::MissingSchemaVersion,
564 "workflow bundle manifest is missing numeric schema_version",
565 )
566 })?;
567 let actual = u32::try_from(schema_version).map_err(|_| {
568 WorkflowBundleError::new(
569 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
570 actual: u32::MAX,
571 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
572 },
573 format!(
574 "unsupported workflow bundle schema_version {schema_version}; expected {WORKFLOW_BUNDLE_SCHEMA_VERSION}"
575 ),
576 )
577 })?;
578 if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
579 return Err(WorkflowBundleError::unsupported_schema_version(actual));
580 }
581 serde_json::from_value(value).map_err(Into::into)
582}
583
584pub fn canonical_workflow_bundle_manifest_bytes(
585 bundle: &WorkflowBundle,
586) -> Result<Vec<u8>, WorkflowBundleError> {
587 serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
588}
589
590pub fn workflow_bundle_hash(
591 bundle: &WorkflowBundle,
592 contents: &[HarnpackEntry],
593) -> Result<String, WorkflowBundleError> {
594 let mut hasher = blake3::Hasher::new();
595 let mut canonical = canonical_workflow_bundle_manifest(bundle);
596 canonical.signature = None;
597 let manifest_bytes = serde_json::to_vec(&canonical)?;
598 hasher.update(&manifest_bytes);
599
600 let mut content_hashes = contents
601 .iter()
602 .map(|entry| blake3_hash_bytes(&entry.bytes))
603 .collect::<Vec<_>>();
604 content_hashes.sort();
605 for content_hash in content_hashes {
606 hasher.update(b"\n");
607 hasher.update(content_hash.as_bytes());
608 }
609
610 Ok(blake3_digest_string(hasher.finalize()))
611}
612
613pub fn verify_workflow_bundle_signature(
614 bundle: &WorkflowBundle,
615 contents: &[HarnpackEntry],
616) -> Result<(), WorkflowBundleError> {
617 let signature = bundle.signature.as_ref().ok_or_else(|| {
618 WorkflowBundleError::new(
619 WorkflowBundleErrorKind::InvalidSignature,
620 "workflow bundle is unsigned",
621 )
622 })?;
623 if signature.algorithm != "ed25519" {
624 return Err(WorkflowBundleError::new(
625 WorkflowBundleErrorKind::InvalidSignature,
626 format!(
627 "unsupported workflow bundle signature algorithm {}",
628 signature.algorithm
629 ),
630 ));
631 }
632 let expected_hash = workflow_bundle_hash(bundle, contents)?;
633 if signature.manifest_hash_blake3 != expected_hash {
634 return Err(WorkflowBundleError::new(
635 WorkflowBundleErrorKind::InvalidSignature,
636 format!(
637 "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
638 signature.manifest_hash_blake3
639 ),
640 ));
641 }
642 let public_key_bytes = decode_hex_exact::<32>(
643 "workflow bundle signature public_key",
644 &signature.public_key,
645 )?;
646 let signature_bytes =
647 decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
648 let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
649 WorkflowBundleError::new(
650 WorkflowBundleErrorKind::InvalidSignature,
651 format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
652 )
653 })?;
654 let ed25519_signature = Signature::from_bytes(&signature_bytes);
655 verifying_key
656 .verify(expected_hash.as_bytes(), &ed25519_signature)
657 .map_err(|error| {
658 WorkflowBundleError::new(
659 WorkflowBundleErrorKind::InvalidSignature,
660 format!("workflow bundle signature failed Ed25519 verification: {error}"),
661 )
662 })
663}
664
665pub fn build_harnpack(
666 bundle: &WorkflowBundle,
667 contents: &[HarnpackEntry],
668) -> Result<Vec<u8>, WorkflowBundleError> {
669 let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
670 let mut entries = contents
671 .iter()
672 .map(|entry| {
673 normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
674 })
675 .collect::<Result<Vec<_>, _>>()?;
676 entries.sort_by(|left, right| left.0.cmp(&right.0));
677
678 let mut seen = BTreeSet::new();
679 for (path, _, _) in &entries {
680 if path == HARNPACK_MANIFEST_PATH {
681 return Err(WorkflowBundleError::new(
682 WorkflowBundleErrorKind::DuplicateArchiveEntry,
683 format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
684 ));
685 }
686 if !seen.insert(path.clone()) {
687 return Err(WorkflowBundleError::new(
688 WorkflowBundleErrorKind::DuplicateArchiveEntry,
689 format!("duplicate archive entry: {path}"),
690 ));
691 }
692 }
693
694 let mut tar_bytes = Vec::new();
695 {
696 let mut builder = tar::Builder::new(&mut tar_bytes);
697 append_harnpack_entry(
698 &mut builder,
699 HARNPACK_MANIFEST_PATH,
700 &manifest_bytes,
701 DEFAULT_HARNPACK_FILE_MODE,
702 )?;
703 for (path, bytes, mode) in entries {
704 append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
705 }
706 builder.finish()?;
707 }
708
709 zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
710}
711
712pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
713 let tar_bytes = decompress_harnpack_zstd(bytes)?;
714 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
715 let mut manifest = None;
716 let mut contents = Vec::new();
717 let mut seen = BTreeSet::new();
718
719 for entry in archive.entries()? {
720 let mut entry = entry?;
721 if entry.header().entry_type().is_dir() {
722 continue;
723 }
724 let path = normalize_archive_path(entry.path()?.as_ref())?;
725 if !seen.insert(path.clone()) {
726 return Err(WorkflowBundleError::new(
727 WorkflowBundleErrorKind::DuplicateArchiveEntry,
728 format!("duplicate archive entry: {path}"),
729 ));
730 }
731 let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
732 let mut entry_bytes = Vec::new();
733 entry.read_to_end(&mut entry_bytes)?;
734
735 if path == HARNPACK_MANIFEST_PATH {
736 manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
737 } else {
738 contents.push(HarnpackEntry {
739 path: PathBuf::from(path),
740 bytes: entry_bytes,
741 mode,
742 });
743 }
744 }
745
746 contents.sort_by(|left, right| left.path.cmp(&right.path));
747 Ok(HarnpackArchive {
748 manifest: manifest.ok_or_else(|| {
749 WorkflowBundleError::new(
750 WorkflowBundleErrorKind::InvalidArchive,
751 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
752 )
753 })?,
754 contents,
755 })
756}
757
758pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
759 let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
760 Ok(blake3_hash_bytes(&bytes))
761}
762
763pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
764 let mut canonical = canonical_workflow_graph(graph);
765 canonical.audit_log.clear();
766 let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
767 let digest = Sha256::digest(bytes);
768 let hex = digest
769 .iter()
770 .map(|byte| format!("{byte:02x}"))
771 .collect::<String>();
772 format!("sha256:{hex}")
773}
774
775fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
776 let mut canonical = bundle.clone();
777 canonical.workflow = canonical_workflow_graph(&bundle.workflow);
778 canonical.transitive_modules.sort_by(|left, right| {
779 (
780 path_sort_key(&left.path),
781 &left.source_hash_blake3,
782 &left.harnbc_hash_blake3,
783 )
784 .cmp(&(
785 path_sort_key(&right.path),
786 &right.source_hash_blake3,
787 &right.harnbc_hash_blake3,
788 ))
789 });
790 canonical.tool_manifest.sort_by(|left, right| {
791 (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
792 &right.name,
793 &right.provider,
794 &right.schema_hash_blake3,
795 ))
796 });
797 canonical.sbom.packages.sort_by(|left, right| {
798 (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
799 &right.name,
800 &right.version,
801 &right.package_hash_blake3,
802 ))
803 });
804 canonical.sbom.relationships.sort_by(|left, right| {
805 (&left.from, &left.to, &left.relationship_type).cmp(&(
806 &right.from,
807 &right.to,
808 &right.relationship_type,
809 ))
810 });
811 canonical
812}
813
814fn decode_hex_exact<const N: usize>(
815 label: &str,
816 value: &str,
817) -> Result<[u8; N], WorkflowBundleError> {
818 let bytes = hex::decode(value).map_err(|error| {
819 WorkflowBundleError::new(
820 WorkflowBundleErrorKind::InvalidSignature,
821 format!("{label} is not hex: {error}"),
822 )
823 })?;
824 bytes.try_into().map_err(|bytes: Vec<u8>| {
825 WorkflowBundleError::new(
826 WorkflowBundleErrorKind::InvalidSignature,
827 format!("{label} must decode to {N} bytes, got {}", bytes.len()),
828 )
829 })
830}
831
832fn append_harnpack_entry<W: std::io::Write>(
833 builder: &mut tar::Builder<W>,
834 path: &str,
835 bytes: &[u8],
836 mode: u32,
837) -> Result<(), WorkflowBundleError> {
838 let mut header = tar::Header::new_gnu();
839 header.set_path(path).map_err(|error| {
840 WorkflowBundleError::new(
841 WorkflowBundleErrorKind::UnsafeArchivePath,
842 format!("invalid archive path {path}: {error}"),
843 )
844 })?;
845 header.set_entry_type(tar::EntryType::Regular);
846 header.set_size(bytes.len() as u64);
847 header.set_mode(mode);
848 header.set_mtime(0);
849 header.set_uid(0);
850 header.set_gid(0);
851 header.set_cksum();
852 builder.append(&header, bytes)?;
853 Ok(())
854}
855
856fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
857 let mut parts = Vec::new();
858 for component in path.components() {
859 match component {
860 Component::Normal(part) => {
861 let Some(part) = part.to_str() else {
862 return Err(WorkflowBundleError::new(
863 WorkflowBundleErrorKind::UnsafeArchivePath,
864 format!("archive path is not valid UTF-8: {}", path.display()),
865 ));
866 };
867 if part.is_empty() {
868 return Err(WorkflowBundleError::new(
869 WorkflowBundleErrorKind::UnsafeArchivePath,
870 "archive path contains an empty component",
871 ));
872 }
873 parts.push(part.to_string());
874 }
875 Component::CurDir => {}
876 Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
877 return Err(WorkflowBundleError::new(
878 WorkflowBundleErrorKind::UnsafeArchivePath,
879 format!(
880 "archive path must be relative and contained: {}",
881 path.display()
882 ),
883 ));
884 }
885 }
886 }
887 if parts.is_empty() {
888 return Err(WorkflowBundleError::new(
889 WorkflowBundleErrorKind::UnsafeArchivePath,
890 "archive path is empty",
891 ));
892 }
893 Ok(parts.join("/"))
894}
895
896fn path_sort_key(path: &Path) -> String {
897 path.components()
898 .filter_map(|component| match component {
899 Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
900 _ => None,
901 })
902 .collect::<Vec<_>>()
903 .join("/")
904}
905
906fn blake3_hash_bytes(bytes: &[u8]) -> String {
907 blake3_digest_string(blake3::hash(bytes))
908}
909
910fn blake3_digest_string(hash: blake3::Hash) -> String {
911 format!("blake3:{hash}")
912}
913
914pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
915 let canonical = canonical_workflow_graph(&bundle.workflow);
916 let mut report = WorkflowBundleValidationReport {
917 valid: true,
918 bundle_id: bundle.id.clone(),
919 workflow_id: canonical.id.clone(),
920 graph_digest: workflow_graph_digest(&canonical),
921 errors: Vec::new(),
922 warnings: Vec::new(),
923 };
924
925 validate_manifest_contract(bundle, &mut report);
926 validate_bundle_identity(bundle, &canonical, &mut report);
927 validate_triggers(bundle, &canonical, &mut report);
928 validate_prompt_capsules(bundle, &canonical, &mut report);
929 validate_policy(bundle, &mut report);
930 validate_connectors(bundle, &mut report);
931 validate_environment(bundle, &mut report);
932
933 let graph_report = validate_workflow(&canonical, None);
934 for error in graph_report.errors {
935 let node_id = workflow_diagnostic_node_id(&error, &canonical);
936 push_error(&mut report, "workflow", error, node_id);
937 }
938 for warning in graph_report.warnings {
939 let node_id = workflow_diagnostic_node_id(&warning, &canonical);
940 push_warning(&mut report, "workflow", warning, node_id);
941 }
942
943 if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
944 if expected != report.graph_digest {
945 let actual = report.graph_digest.clone();
946 push_error(
947 &mut report,
948 "receipts.graph_digest",
949 format!("graph digest mismatch: expected {expected}, computed {actual}"),
950 None,
951 );
952 }
953 }
954 if let Some(expected_version) = bundle.receipts.workflow_version {
955 if expected_version != canonical.version {
956 push_error(
957 &mut report,
958 "receipts.workflow_version",
959 format!(
960 "workflow version mismatch: expected {expected_version}, computed {}",
961 canonical.version
962 ),
963 None,
964 );
965 }
966 }
967
968 report.valid = report.errors.is_empty();
969 report
970}
971
972pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
973 let canonical = canonical_workflow_graph(&bundle.workflow);
974 let validation = validate_workflow_bundle(bundle);
975 let graph = export_workflow_bundle_graph(bundle, &validation);
976 let mermaid = graph.mermaid.clone();
977 let triggers_by_node = triggers_by_node(bundle);
978 let capsules_by_node = capsules_by_node(bundle);
979 let mut nodes = Vec::new();
980
981 for (node_id, node) in &canonical.nodes {
982 let mut outgoing = canonical
983 .edges
984 .iter()
985 .filter(|edge| edge.from == *node_id)
986 .map(|edge| edge.to.clone())
987 .collect::<Vec<_>>();
988 outgoing.sort();
989 outgoing.dedup();
990 nodes.push(WorkflowBundlePreviewNode {
991 id: node_id.clone(),
992 kind: node.kind.clone(),
993 label: node.task_label.clone(),
994 prompt_capsule: capsules_by_node.get(node_id).cloned(),
995 trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
996 outgoing,
997 });
998 }
999
1000 WorkflowBundlePreview {
1001 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1002 bundle_id: bundle.id.clone(),
1003 bundle_version: bundle.version.clone(),
1004 workflow_id: canonical.id.clone(),
1005 workflow_version: canonical.version,
1006 graph_digest: validation.graph_digest.clone(),
1007 validation,
1008 graph,
1009 mermaid,
1010 triggers: bundle.triggers.clone(),
1011 connectors: bundle.connectors.clone(),
1012 environment: bundle.environment.clone(),
1013 nodes,
1014 edges: sorted_edges(&canonical),
1015 }
1016}
1017
1018pub fn export_workflow_bundle_graph(
1019 bundle: &WorkflowBundle,
1020 validation: &WorkflowBundleValidationReport,
1021) -> WorkflowBundleGraphExport {
1022 let canonical = canonical_workflow_graph(&bundle.workflow);
1023 let mut nodes = Vec::new();
1024 let mut edges = Vec::new();
1025 let mut editable_fields = Vec::new();
1026 let capsules_by_node = capsules_by_node(bundle);
1027 let catchup_enabled = bundle.policy.catchup.mode != "none";
1028 let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1029
1030 for (index, connector) in bundle.connectors.iter().enumerate() {
1031 let node_fields = connector_editable_fields(index, connector);
1032 editable_fields.extend(node_fields.clone());
1033 nodes.push(WorkflowBundleGraphNode {
1034 id: connector_graph_id(&connector.id),
1035 node_type: "connector_call".to_string(),
1036 label: connector_label(connector),
1037 workflow_node_id: None,
1038 trigger_id: None,
1039 connector_id: Some(connector.id.clone()),
1040 editable_fields: node_fields,
1041 metadata: BTreeMap::from([
1042 (
1043 "provider_id".to_string(),
1044 serde_json::json!(connector.provider_id),
1045 ),
1046 ("scopes".to_string(), serde_json::json!(connector.scopes)),
1047 ]),
1048 });
1049 }
1050
1051 let catchup_fields = catchup_editable_fields();
1052 let retry_fields = retry_editable_fields();
1053 if catchup_enabled {
1054 let node_fields = catchup_fields;
1055 editable_fields.extend(node_fields.clone());
1056 nodes.push(WorkflowBundleGraphNode {
1057 id: catchup_graph_id(),
1058 node_type: "catchup".to_string(),
1059 label: "Catch up".to_string(),
1060 workflow_node_id: None,
1061 trigger_id: None,
1062 connector_id: None,
1063 editable_fields: node_fields,
1064 metadata: BTreeMap::from([(
1065 "mode".to_string(),
1066 serde_json::json!(bundle.policy.catchup.mode),
1067 )]),
1068 });
1069 } else {
1070 editable_fields.extend(catchup_fields);
1071 }
1072 if retry_can_dlq {
1073 let node_fields = retry_fields;
1074 editable_fields.extend(node_fields.clone());
1075 nodes.push(WorkflowBundleGraphNode {
1076 id: dlq_graph_id(),
1077 node_type: "dlq".to_string(),
1078 label: "Dead letter queue".to_string(),
1079 workflow_node_id: None,
1080 trigger_id: None,
1081 connector_id: None,
1082 editable_fields: node_fields,
1083 metadata: BTreeMap::from([(
1084 "max_attempts".to_string(),
1085 serde_json::json!(bundle.policy.retry.max_attempts),
1086 )]),
1087 });
1088 } else {
1089 editable_fields.extend(retry_fields);
1090 }
1091
1092 for (index, trigger) in bundle.triggers.iter().enumerate() {
1093 let node_fields = trigger_editable_fields(index, trigger);
1094 editable_fields.extend(node_fields.clone());
1095 nodes.push(WorkflowBundleGraphNode {
1096 id: trigger_graph_id(&trigger.id),
1097 node_type: "trigger".to_string(),
1098 label: trigger_label(trigger),
1099 workflow_node_id: trigger.node_id.clone(),
1100 trigger_id: Some(trigger.id.clone()),
1101 connector_id: None,
1102 editable_fields: node_fields,
1103 metadata: BTreeMap::from([
1104 ("kind".to_string(), serde_json::json!(trigger.kind)),
1105 ("provider".to_string(), serde_json::json!(trigger.provider)),
1106 ("events".to_string(), serde_json::json!(trigger.events)),
1107 ]),
1108 });
1109 if let Some(provider) = trigger.provider.as_deref() {
1110 if let Some(connector) = bundle
1111 .connectors
1112 .iter()
1113 .find(|connector| connector.provider_id == provider || connector.id == provider)
1114 {
1115 edges.push(WorkflowBundleGraphEdge {
1116 from: connector_graph_id(&connector.id),
1117 to: trigger_graph_id(&trigger.id),
1118 label: Some("binds".to_string()),
1119 branch: None,
1120 });
1121 }
1122 }
1123 let target = trigger
1124 .node_id
1125 .clone()
1126 .unwrap_or_else(|| canonical.entry.clone());
1127 if catchup_enabled {
1128 edges.push(WorkflowBundleGraphEdge {
1129 from: trigger_graph_id(&trigger.id),
1130 to: catchup_graph_id(),
1131 label: Some(bundle.policy.catchup.mode.clone()),
1132 branch: Some("catchup".to_string()),
1133 });
1134 edges.push(WorkflowBundleGraphEdge {
1135 from: catchup_graph_id(),
1136 to: workflow_graph_id(&target),
1137 label: Some("dispatch".to_string()),
1138 branch: None,
1139 });
1140 } else {
1141 edges.push(WorkflowBundleGraphEdge {
1142 from: trigger_graph_id(&trigger.id),
1143 to: workflow_graph_id(&target),
1144 label: Some("dispatch".to_string()),
1145 branch: None,
1146 });
1147 }
1148 }
1149
1150 for (node_id, node) in &canonical.nodes {
1151 let capsule_id = capsules_by_node.get(node_id);
1152 let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1153 editable_fields.extend(node_fields.clone());
1154 nodes.push(WorkflowBundleGraphNode {
1155 id: workflow_graph_id(node_id),
1156 node_type: workflow_node_type(&node.kind),
1157 label: workflow_node_label(node_id, node),
1158 workflow_node_id: Some(node_id.clone()),
1159 trigger_id: None,
1160 connector_id: None,
1161 editable_fields: node_fields,
1162 metadata: BTreeMap::from([
1163 ("kind".to_string(), serde_json::json!(node.kind)),
1164 ("task_label".to_string(), serde_json::json!(node.task_label)),
1165 (
1166 "prompt_capsule".to_string(),
1167 serde_json::json!(capsule_id.cloned()),
1168 ),
1169 ]),
1170 });
1171 }
1172
1173 for edge in sorted_edges(&canonical) {
1174 edges.push(WorkflowBundleGraphEdge {
1175 from: workflow_graph_id(&edge.from),
1176 to: workflow_graph_id(&edge.to),
1177 label: edge.label.clone(),
1178 branch: edge.branch.clone(),
1179 });
1180 }
1181
1182 let outgoing: BTreeSet<&str> = canonical
1183 .edges
1184 .iter()
1185 .map(|edge| edge.from.as_str())
1186 .collect();
1187 for node_id in canonical.nodes.keys() {
1188 if !outgoing.contains(node_id.as_str()) {
1189 edges.push(WorkflowBundleGraphEdge {
1190 from: workflow_graph_id(node_id),
1191 to: terminal_completed_graph_id(),
1192 label: Some("completed".to_string()),
1193 branch: Some("completed".to_string()),
1194 });
1195 }
1196 if retry_can_dlq {
1197 edges.push(WorkflowBundleGraphEdge {
1198 from: workflow_graph_id(node_id),
1199 to: dlq_graph_id(),
1200 label: Some("retry exhausted".to_string()),
1201 branch: Some("failed".to_string()),
1202 });
1203 }
1204 }
1205
1206 nodes.push(WorkflowBundleGraphNode {
1207 id: terminal_completed_graph_id(),
1208 node_type: "terminal".to_string(),
1209 label: "Completed".to_string(),
1210 workflow_node_id: None,
1211 trigger_id: None,
1212 connector_id: None,
1213 editable_fields: Vec::new(),
1214 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1215 });
1216 nodes.push(WorkflowBundleGraphNode {
1217 id: terminal_failed_graph_id(),
1218 node_type: "terminal".to_string(),
1219 label: "Failed".to_string(),
1220 workflow_node_id: None,
1221 trigger_id: None,
1222 connector_id: None,
1223 editable_fields: Vec::new(),
1224 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1225 });
1226 if retry_can_dlq {
1227 edges.push(WorkflowBundleGraphEdge {
1228 from: dlq_graph_id(),
1229 to: terminal_failed_graph_id(),
1230 label: Some("failed".to_string()),
1231 branch: Some("failed".to_string()),
1232 });
1233 }
1234
1235 nodes.sort_by(|left, right| left.id.cmp(&right.id));
1236 edges.sort_by(|left, right| {
1237 (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1238 &right.from,
1239 &right.to,
1240 &right.branch,
1241 &right.label,
1242 ))
1243 });
1244 editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1245
1246 let diagnostics = validation
1247 .errors
1248 .iter()
1249 .chain(validation.warnings.iter())
1250 .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1251 severity: diagnostic.severity.clone(),
1252 path: diagnostic.path.clone(),
1253 message: diagnostic.message.clone(),
1254 node_id: diagnostic.node_id.clone(),
1255 graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1256 })
1257 .collect::<Vec<_>>();
1258 let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1259
1260 WorkflowBundleGraphExport {
1261 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1262 graph_id: canonical.id,
1263 graph_digest: validation.graph_digest.clone(),
1264 nodes,
1265 edges,
1266 diagnostics,
1267 editable_fields,
1268 mermaid,
1269 }
1270}
1271
1272pub fn run_workflow_bundle(
1273 bundle: &WorkflowBundle,
1274 request: WorkflowBundleRunRequest,
1275) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1276 let validation = validate_workflow_bundle(bundle);
1277 if !validation.valid {
1278 return Err(validation);
1279 }
1280
1281 let canonical = canonical_workflow_graph(&bundle.workflow);
1282 let trigger_id = match request.trigger_id {
1283 Some(trigger_id)
1284 if !bundle
1285 .triggers
1286 .iter()
1287 .any(|trigger| trigger.id == trigger_id) =>
1288 {
1289 let mut report = validation;
1290 push_error(
1291 &mut report,
1292 "trigger_id",
1293 format!("unknown trigger id: {trigger_id}"),
1294 None,
1295 );
1296 report.valid = false;
1297 return Err(report);
1298 }
1299 Some(trigger_id) => Some(trigger_id),
1300 None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1301 };
1302 let mut event_ids = bundle.receipts.event_ids.clone();
1303 if let Some(event_id) = request.event_id {
1304 if !event_ids.contains(&event_id) {
1305 event_ids.push(event_id);
1306 }
1307 }
1308 let run_id = bundle
1309 .receipts
1310 .run_id
1311 .clone()
1312 .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1313 let capsules_by_node = capsules_by_node(bundle);
1314 let executed_nodes = execution_order(&canonical)
1315 .into_iter()
1316 .map(|node_id| {
1317 let node = canonical
1318 .nodes
1319 .get(&node_id)
1320 .expect("execution order only contains known nodes");
1321 WorkflowBundleRunNodeReceipt {
1322 node_id: node_id.clone(),
1323 kind: node.kind.clone(),
1324 prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1325 status: "completed".to_string(),
1326 }
1327 })
1328 .collect();
1329
1330 Ok(WorkflowBundleRunReceipt {
1331 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1332 receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1333 bundle_id: bundle.id.clone(),
1334 bundle_version: bundle.version.clone(),
1335 workflow_id: canonical.id,
1336 workflow_version: canonical.version,
1337 graph_digest: validation.graph_digest,
1338 run_id,
1339 trigger_id,
1340 event_ids,
1341 status: "completed".to_string(),
1342 executed_nodes,
1343 policy: bundle.policy.clone(),
1344 connectors: bundle.connectors.clone(),
1345 environment: bundle.environment.clone(),
1346 })
1347}
1348
1349fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1350 let mut canonical = graph.clone();
1351 if canonical.type_name.is_empty() {
1352 canonical.type_name = "workflow_graph".to_string();
1353 }
1354 if canonical.version == 0 {
1355 canonical.version = 1;
1356 }
1357 if canonical.entry.is_empty() {
1358 canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1359 }
1360 for (node_id, node) in &mut canonical.nodes {
1361 if node.id.is_none() {
1362 node.id = Some(node_id.clone());
1363 }
1364 if node.kind.is_empty() {
1365 node.kind = "stage".to_string();
1366 }
1367 if node.retry_policy.max_attempts == 0 {
1368 node.retry_policy.max_attempts = 1;
1369 }
1370 }
1371 canonical.edges = sorted_edges(&canonical);
1372 canonical
1373}
1374
1375fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1376 let mut edges = graph.edges.clone();
1377 edges.sort_by(|left, right| {
1378 (
1379 &left.from,
1380 &left.to,
1381 left.branch.as_deref(),
1382 left.label.as_deref(),
1383 )
1384 .cmp(&(
1385 &right.from,
1386 &right.to,
1387 right.branch.as_deref(),
1388 right.label.as_deref(),
1389 ))
1390 });
1391 edges
1392}
1393
1394fn validate_bundle_identity(
1395 bundle: &WorkflowBundle,
1396 graph: &WorkflowGraph,
1397 report: &mut WorkflowBundleValidationReport,
1398) {
1399 if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1400 push_error(
1401 report,
1402 "schema_version",
1403 format!(
1404 "unsupported schema_version {}; expected {}",
1405 bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1406 ),
1407 None,
1408 );
1409 }
1410 if bundle.id.trim().is_empty() {
1411 push_error(report, "id", "bundle id is required", None);
1412 }
1413 if bundle.version.trim().is_empty() {
1414 push_error(report, "version", "bundle version is required", None);
1415 }
1416 if graph.id.trim().is_empty() {
1417 push_error(
1418 report,
1419 "workflow.id",
1420 "workflow id is required for portable bundles",
1421 None,
1422 );
1423 }
1424 if graph.nodes.is_empty() {
1425 push_error(
1426 report,
1427 "workflow.nodes",
1428 "workflow must contain nodes",
1429 None,
1430 );
1431 }
1432 for (node_id, node) in &graph.nodes {
1433 if node_id.trim().is_empty() {
1434 push_error(report, "workflow.nodes", "node id is required", None);
1435 }
1436 if node.id.as_deref().is_some_and(|id| id != node_id) {
1437 push_error(
1438 report,
1439 format!("workflow.nodes.{node_id}.id"),
1440 "node id field must match its map key",
1441 Some(node_id.clone()),
1442 );
1443 }
1444 }
1445}
1446
1447fn validate_manifest_contract(
1448 bundle: &WorkflowBundle,
1449 report: &mut WorkflowBundleValidationReport,
1450) {
1451 validate_relative_path(
1452 report,
1453 "entrypoint",
1454 &bundle.entrypoint,
1455 "entrypoint is required",
1456 );
1457 if bundle.transitive_modules.is_empty() {
1458 push_error(
1459 report,
1460 "transitive_modules",
1461 "at least one transitive module entry is required",
1462 None,
1463 );
1464 }
1465 let mut module_paths = BTreeSet::new();
1466 for (index, module) in bundle.transitive_modules.iter().enumerate() {
1467 let path = format!("transitive_modules[{index}]");
1468 validate_relative_path(
1469 report,
1470 format!("{path}.path"),
1471 &module.path,
1472 "module path is required",
1473 );
1474 if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1475 {
1476 push_error(
1477 report,
1478 format!("{path}.path"),
1479 format!(
1480 "duplicate transitive module path: {}",
1481 module.path.display()
1482 ),
1483 None,
1484 );
1485 }
1486 validate_blake3_hash(
1487 report,
1488 format!("{path}.source_hash_blake3"),
1489 &module.source_hash_blake3,
1490 true,
1491 );
1492 validate_blake3_hash(
1493 report,
1494 format!("{path}.harnbc_hash_blake3"),
1495 &module.harnbc_hash_blake3,
1496 true,
1497 );
1498 }
1499 if bundle.stdlib_version.trim().is_empty() {
1500 push_error(report, "stdlib_version", "stdlib_version is required", None);
1501 }
1502 if bundle.harn_version.trim().is_empty() {
1503 push_error(report, "harn_version", "harn_version is required", None);
1504 }
1505 validate_blake3_hash(
1506 report,
1507 "provider_catalog_hash",
1508 &bundle.provider_catalog_hash,
1509 true,
1510 );
1511
1512 let mut tool_names = BTreeSet::new();
1513 for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1514 let path = format!("tool_manifest[{index}]");
1515 if tool.name.trim().is_empty() {
1516 push_error(
1517 report,
1518 format!("{path}.name"),
1519 "tool manifest entry name is required",
1520 None,
1521 );
1522 } else if !tool_names.insert(tool.name.clone()) {
1523 push_error(
1524 report,
1525 format!("{path}.name"),
1526 format!("duplicate tool manifest entry: {}", tool.name),
1527 None,
1528 );
1529 }
1530 if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1531 validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1532 }
1533 }
1534
1535 if bundle.sbom.format.trim().is_empty() {
1536 push_error(report, "sbom.format", "SBOM format is required", None);
1537 }
1538 if bundle.sbom.version.trim().is_empty() {
1539 push_error(report, "sbom.version", "SBOM version is required", None);
1540 }
1541 let mut sbom_packages = BTreeSet::new();
1542 for (index, package) in bundle.sbom.packages.iter().enumerate() {
1543 let path = format!("sbom.packages[{index}]");
1544 if package.name.trim().is_empty() {
1545 push_error(
1546 report,
1547 format!("{path}.name"),
1548 "SBOM package name is required",
1549 None,
1550 );
1551 } else if !sbom_packages.insert(package.name.clone()) {
1552 push_error(
1553 report,
1554 format!("{path}.name"),
1555 format!("duplicate SBOM package: {}", package.name),
1556 None,
1557 );
1558 }
1559 if let Some(hash) = package.package_hash_blake3.as_deref() {
1560 validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1561 }
1562 }
1563 for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1564 let path = format!("sbom.relationships[{index}]");
1565 if relationship.from.trim().is_empty() {
1566 push_error(
1567 report,
1568 format!("{path}.from"),
1569 "SBOM relationship source is required",
1570 None,
1571 );
1572 }
1573 if relationship.to.trim().is_empty() {
1574 push_error(
1575 report,
1576 format!("{path}.to"),
1577 "SBOM relationship target is required",
1578 None,
1579 );
1580 }
1581 if relationship.relationship_type.trim().is_empty() {
1582 push_error(
1583 report,
1584 format!("{path}.relationship_type"),
1585 "SBOM relationship type is required",
1586 None,
1587 );
1588 }
1589 }
1590
1591 if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1592 if parent_id.trim().is_empty() {
1593 push_error(
1594 report,
1595 "parent_trust_record_id",
1596 "parent_trust_record_id cannot be empty when present",
1597 None,
1598 );
1599 }
1600 }
1601 if let Some(signature) = &bundle.signature {
1602 if signature.algorithm.trim().is_empty() {
1603 push_error(
1604 report,
1605 "signature.algorithm",
1606 "signature algorithm is required",
1607 None,
1608 );
1609 } else if signature.algorithm != "ed25519" {
1610 push_error(
1611 report,
1612 "signature.algorithm",
1613 "signature algorithm must be ed25519",
1614 None,
1615 );
1616 }
1617 if signature.public_key.trim().is_empty() {
1618 push_error(
1619 report,
1620 "signature.public_key",
1621 "signature public_key is required",
1622 None,
1623 );
1624 }
1625 if signature.signature.trim().is_empty() {
1626 push_error(
1627 report,
1628 "signature.signature",
1629 "signature value is required",
1630 None,
1631 );
1632 }
1633 validate_blake3_hash(
1634 report,
1635 "signature.manifest_hash_blake3",
1636 &signature.manifest_hash_blake3,
1637 true,
1638 );
1639 }
1640}
1641
1642fn validate_relative_path(
1643 report: &mut WorkflowBundleValidationReport,
1644 path: impl Into<String>,
1645 value: &Path,
1646 empty_message: &str,
1647) {
1648 let path = path.into();
1649 if value.as_os_str().is_empty() {
1650 push_error(report, path, empty_message, None);
1651 return;
1652 }
1653 if normalize_archive_path(value).is_err() {
1654 push_error(
1655 report,
1656 path,
1657 format!("path must be relative and contained: {}", value.display()),
1658 None,
1659 );
1660 }
1661}
1662
1663fn validate_blake3_hash(
1664 report: &mut WorkflowBundleValidationReport,
1665 path: impl Into<String>,
1666 value: &str,
1667 required: bool,
1668) {
1669 let path = path.into();
1670 if value.trim().is_empty() {
1671 if required {
1672 push_error(report, path, "BLAKE3 hash is required", None);
1673 }
1674 return;
1675 }
1676 let Some(hex) = value.strip_prefix("blake3:") else {
1677 push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1678 return;
1679 };
1680 if hex.len() != 64
1681 || !hex
1682 .bytes()
1683 .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1684 {
1685 push_error(
1686 report,
1687 path,
1688 "BLAKE3 hash must contain 64 lowercase hex digits",
1689 None,
1690 );
1691 }
1692}
1693
1694fn validate_triggers(
1695 bundle: &WorkflowBundle,
1696 graph: &WorkflowGraph,
1697 report: &mut WorkflowBundleValidationReport,
1698) {
1699 if bundle.triggers.is_empty() {
1700 push_warning(report, "triggers", "bundle declares no triggers", None);
1701 }
1702 let mut ids = BTreeSet::new();
1703 for (index, trigger) in bundle.triggers.iter().enumerate() {
1704 let path = format!("triggers[{index}]");
1705 if trigger.id.trim().is_empty() {
1706 push_error(report, format!("{path}.id"), "trigger id is required", None);
1707 } else if !ids.insert(trigger.id.clone()) {
1708 push_error(
1709 report,
1710 format!("{path}.id"),
1711 format!("duplicate trigger id: {}", trigger.id),
1712 None,
1713 );
1714 }
1715 match trigger.kind.as_str() {
1716 "github" => {
1717 if trigger.provider.as_deref() != Some("github") {
1718 push_error(
1719 report,
1720 format!("{path}.provider"),
1721 "github triggers require provider=\"github\"",
1722 None,
1723 );
1724 }
1725 if trigger.events.is_empty() {
1726 push_error(
1727 report,
1728 format!("{path}.events"),
1729 "github triggers require at least one event",
1730 None,
1731 );
1732 }
1733 }
1734 "cron" if trigger.schedule.is_none() => push_error(
1735 report,
1736 format!("{path}.schedule"),
1737 "cron triggers require schedule",
1738 None,
1739 ),
1740 "delay" if trigger.delay.is_none() => push_error(
1741 report,
1742 format!("{path}.delay"),
1743 "delay triggers require delay",
1744 None,
1745 ),
1746 "webhook" if trigger.webhook_path.is_none() => push_error(
1747 report,
1748 format!("{path}.webhook_path"),
1749 "webhook triggers require webhook_path",
1750 None,
1751 ),
1752 "mcp" if trigger.mcp_tool.is_none() => push_error(
1753 report,
1754 format!("{path}.mcp_tool"),
1755 "mcp triggers require mcp_tool",
1756 None,
1757 ),
1758 "manual" => {}
1759 "" => push_error(
1760 report,
1761 format!("{path}.kind"),
1762 "trigger kind is required",
1763 None,
1764 ),
1765 other
1766 if !matches!(
1767 other,
1768 "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1769 ) =>
1770 {
1771 push_error(
1772 report,
1773 format!("{path}.kind"),
1774 format!("unsupported trigger kind: {other}"),
1775 None,
1776 );
1777 }
1778 _ => {}
1779 }
1780 if let Some(node_id) = trigger.node_id.as_deref() {
1781 if !graph.nodes.contains_key(node_id) {
1782 push_error(
1783 report,
1784 format!("{path}.node_id"),
1785 format!("trigger references unknown node: {node_id}"),
1786 Some(node_id.to_string()),
1787 );
1788 }
1789 }
1790 }
1791}
1792
1793fn validate_prompt_capsules(
1794 bundle: &WorkflowBundle,
1795 graph: &WorkflowGraph,
1796 report: &mut WorkflowBundleValidationReport,
1797) {
1798 let trigger_ids: BTreeSet<&str> = bundle
1799 .triggers
1800 .iter()
1801 .map(|trigger| trigger.id.as_str())
1802 .collect();
1803 let mut node_refs = BTreeSet::new();
1804 for (key, capsule) in &bundle.prompt_capsules {
1805 let path = format!("prompt_capsules.{key}");
1806 if capsule.id.trim().is_empty() {
1807 push_error(
1808 report,
1809 format!("{path}.id"),
1810 "prompt capsule id is required",
1811 None,
1812 );
1813 } else if capsule.id != *key {
1814 push_error(
1815 report,
1816 format!("{path}.id"),
1817 "prompt capsule id must match its map key",
1818 None,
1819 );
1820 }
1821 if capsule.prompt.trim().is_empty() {
1822 push_error(
1823 report,
1824 format!("{path}.prompt"),
1825 "prompt capsule prompt is required",
1826 Some(capsule.node_id.clone()),
1827 );
1828 }
1829 if !graph.nodes.contains_key(&capsule.node_id) {
1830 push_error(
1831 report,
1832 format!("{path}.node_id"),
1833 format!(
1834 "prompt capsule references unknown node: {}",
1835 capsule.node_id
1836 ),
1837 Some(capsule.node_id.clone()),
1838 );
1839 }
1840 if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1841 push_error(
1842 report,
1843 format!("{path}.node_id"),
1844 format!("multiple prompt capsules target node {}", capsule.node_id),
1845 Some(capsule.node_id.clone()),
1846 );
1847 }
1848 if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1849 if !trigger_ids.contains(trigger_id) {
1850 push_error(
1851 report,
1852 format!("{path}.trigger_id"),
1853 format!("prompt capsule references unknown trigger: {trigger_id}"),
1854 Some(capsule.node_id.clone()),
1855 );
1856 }
1857 }
1858 }
1859}
1860
1861fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1862 if !matches!(
1863 bundle.policy.autonomy_tier.as_str(),
1864 "shadow" | "suggest" | "act_with_approval" | "act_auto"
1865 ) {
1866 push_error(
1867 report,
1868 "policy.autonomy_tier",
1869 "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1870 None,
1871 );
1872 }
1873 if bundle.policy.retry.max_attempts == 0 {
1874 push_error(
1875 report,
1876 "policy.retry.max_attempts",
1877 "retry.max_attempts must be at least 1",
1878 None,
1879 );
1880 }
1881 if !matches!(
1882 bundle.policy.catchup.mode.as_str(),
1883 "none" | "latest" | "all"
1884 ) {
1885 push_error(
1886 report,
1887 "policy.catchup.mode",
1888 "catchup.mode must be none, latest, or all",
1889 None,
1890 );
1891 }
1892}
1893
1894fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1895 let mut ids = BTreeSet::new();
1896 let provider_ids: BTreeSet<&str> = bundle
1897 .connectors
1898 .iter()
1899 .map(|connector| connector.provider_id.as_str())
1900 .collect();
1901 for (index, connector) in bundle.connectors.iter().enumerate() {
1902 let path = format!("connectors[{index}]");
1903 if connector.id.trim().is_empty() {
1904 push_error(
1905 report,
1906 format!("{path}.id"),
1907 "connector id is required",
1908 None,
1909 );
1910 } else if !ids.insert(connector.id.clone()) {
1911 push_error(
1912 report,
1913 format!("{path}.id"),
1914 format!("duplicate connector id: {}", connector.id),
1915 None,
1916 );
1917 }
1918 if connector.provider_id.trim().is_empty() {
1919 push_error(
1920 report,
1921 format!("{path}.provider_id"),
1922 "connector provider_id is required",
1923 None,
1924 );
1925 }
1926 }
1927 for trigger in &bundle.triggers {
1928 if let Some(provider) = trigger.provider.as_deref() {
1929 if !provider_ids.contains(provider) {
1930 push_warning(
1931 report,
1932 "connectors",
1933 format!(
1934 "trigger {} references provider {provider} with no connector requirement",
1935 trigger.id
1936 ),
1937 trigger.node_id.clone(),
1938 );
1939 }
1940 }
1941 }
1942}
1943
1944fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1945 if !matches!(
1946 bundle.environment.worktree_policy.as_str(),
1947 "reuse_current" | "new_worktree" | "host_managed"
1948 ) {
1949 push_error(
1950 report,
1951 "environment.worktree_policy",
1952 "worktree_policy must be reuse_current, new_worktree, or host_managed",
1953 None,
1954 );
1955 }
1956}
1957
1958fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1959 for prefix in [
1960 "node is unreachable: ",
1961 "edge.from references unknown node: ",
1962 "edge.to references unknown node: ",
1963 "entry node does not exist: ",
1964 ] {
1965 if let Some(node_id) = message.strip_prefix(prefix) {
1966 return Some(node_id.to_string());
1967 }
1968 }
1969 if let Some(rest) = message.strip_prefix("node ") {
1970 if let Some((node_id, _)) = rest.split_once(':') {
1971 return Some(node_id.to_string());
1972 }
1973 }
1974 graph
1975 .nodes
1976 .keys()
1977 .find(|node_id| message.contains(&format!("node {node_id}:")))
1978 .cloned()
1979}
1980
1981fn workflow_graph_id(node_id: &str) -> String {
1982 format!("node/{node_id}")
1983}
1984
1985fn trigger_graph_id(trigger_id: &str) -> String {
1986 format!("trigger/{trigger_id}")
1987}
1988
1989fn connector_graph_id(connector_id: &str) -> String {
1990 format!("connector/{connector_id}")
1991}
1992
1993fn catchup_graph_id() -> String {
1994 "policy/catchup".to_string()
1995}
1996
1997fn dlq_graph_id() -> String {
1998 "policy/dlq".to_string()
1999}
2000
2001fn terminal_completed_graph_id() -> String {
2002 "terminal/completed".to_string()
2003}
2004
2005fn terminal_failed_graph_id() -> String {
2006 "terminal/failed".to_string()
2007}
2008
2009fn workflow_node_type(kind: &str) -> String {
2010 match kind {
2011 "action" => "action",
2012 "stage" | "agent" => "agent",
2013 "subagent" | "worker" => "subagent",
2014 "wait" | "waitpoint" | "delay" => "wait",
2015 "approval" | "hitl" => "approval",
2016 "connector" | "connector_call" => "connector_call",
2017 "notification" | "notify" => "notification",
2018 "terminal" | "success" | "failure" => "terminal",
2019 other if other.trim().is_empty() => "agent",
2020 other => other,
2021 }
2022 .to_string()
2023}
2024
2025fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2026 node.task_label
2027 .clone()
2028 .or_else(|| node.prompt.clone())
2029 .map(|label| label.trim().to_string())
2030 .filter(|label| !label.is_empty())
2031 .unwrap_or_else(|| node_id.to_string())
2032}
2033
2034fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2035 if !trigger.events.is_empty() {
2036 format!("{}: {}", trigger.kind, trigger.events.join(", "))
2037 } else if let Some(schedule) = trigger.schedule.as_deref() {
2038 format!("cron: {schedule}")
2039 } else if let Some(delay) = trigger.delay.as_deref() {
2040 format!("delay: {delay}")
2041 } else {
2042 trigger.id.clone()
2043 }
2044}
2045
2046fn connector_label(connector: &ConnectorRequirement) -> String {
2047 if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2048 connector.id.clone()
2049 } else {
2050 format!("{} ({})", connector.id, connector.provider_id)
2051 }
2052}
2053
2054fn editable_field(
2055 id: impl Into<String>,
2056 label: impl Into<String>,
2057 json_pointer: impl Into<String>,
2058 value_type: impl Into<String>,
2059 required: bool,
2060 enum_values: &[&str],
2061) -> WorkflowBundleEditableField {
2062 WorkflowBundleEditableField {
2063 id: id.into(),
2064 label: label.into(),
2065 json_pointer: json_pointer.into(),
2066 value_type: value_type.into(),
2067 required,
2068 enum_values: enum_values
2069 .iter()
2070 .map(|value| (*value).to_string())
2071 .collect(),
2072 }
2073}
2074
2075fn json_pointer_segment(value: &str) -> String {
2076 value.replace('~', "~0").replace('/', "~1")
2077}
2078
2079fn trigger_editable_fields(
2080 index: usize,
2081 trigger: &WorkflowBundleTrigger,
2082) -> Vec<WorkflowBundleEditableField> {
2083 let base = format!("/triggers/{index}");
2084 let mut fields = vec![
2085 editable_field(
2086 format!("trigger.{}.kind", trigger.id),
2087 "Trigger kind",
2088 format!("{base}/kind"),
2089 "enum",
2090 true,
2091 &["github", "cron", "delay", "manual", "webhook", "mcp"],
2092 ),
2093 editable_field(
2094 format!("trigger.{}.node_id", trigger.id),
2095 "Target node",
2096 format!("{base}/node_id"),
2097 "string",
2098 false,
2099 &[],
2100 ),
2101 ];
2102 if trigger.provider.is_some() || trigger.kind == "github" {
2103 fields.push(editable_field(
2104 format!("trigger.{}.provider", trigger.id),
2105 "Provider",
2106 format!("{base}/provider"),
2107 "string",
2108 trigger.kind == "github",
2109 &[],
2110 ));
2111 }
2112 for (field, label, value_type) in [
2113 ("events", "Events", "list"),
2114 ("schedule", "Schedule", "string"),
2115 ("delay", "Delay", "string"),
2116 ("webhook_path", "Webhook path", "string"),
2117 ("mcp_tool", "MCP tool", "string"),
2118 ("resume_key", "Resume key", "string"),
2119 ("metadata", "Metadata", "object"),
2120 ] {
2121 fields.push(editable_field(
2122 format!("trigger.{}.{}", trigger.id, field),
2123 label,
2124 format!("{base}/{field}"),
2125 value_type,
2126 false,
2127 &[],
2128 ));
2129 }
2130 fields
2131}
2132
2133fn workflow_node_editable_fields(
2134 node_id: &str,
2135 capsule_id: Option<&String>,
2136) -> Vec<WorkflowBundleEditableField> {
2137 let escaped_node = json_pointer_segment(node_id);
2138 let mut fields = vec![
2139 editable_field(
2140 format!("workflow.{node_id}.task_label"),
2141 "Task label",
2142 format!("/workflow/nodes/{escaped_node}/task_label"),
2143 "string",
2144 false,
2145 &[],
2146 ),
2147 editable_field(
2148 format!("workflow.{node_id}.prompt"),
2149 "Prompt",
2150 format!("/workflow/nodes/{escaped_node}/prompt"),
2151 "string",
2152 false,
2153 &[],
2154 ),
2155 editable_field(
2156 format!("workflow.{node_id}.system"),
2157 "System prompt",
2158 format!("/workflow/nodes/{escaped_node}/system"),
2159 "string",
2160 false,
2161 &[],
2162 ),
2163 editable_field(
2164 format!("workflow.{node_id}.model_policy"),
2165 "Model policy",
2166 format!("/workflow/nodes/{escaped_node}/model_policy"),
2167 "object",
2168 false,
2169 &[],
2170 ),
2171 editable_field(
2172 format!("workflow.{node_id}.tools"),
2173 "Tool policy",
2174 format!("/workflow/nodes/{escaped_node}/tools"),
2175 "any",
2176 false,
2177 &[],
2178 ),
2179 editable_field(
2180 format!("workflow.{node_id}.capability_policy"),
2181 "Capability policy",
2182 format!("/workflow/nodes/{escaped_node}/capability_policy"),
2183 "object",
2184 false,
2185 &[],
2186 ),
2187 editable_field(
2188 format!("workflow.{node_id}.approval_policy"),
2189 "Approval policy",
2190 format!("/workflow/nodes/{escaped_node}/approval_policy"),
2191 "object",
2192 false,
2193 &[],
2194 ),
2195 editable_field(
2196 format!("workflow.{node_id}.retry_policy"),
2197 "Retry policy",
2198 format!("/workflow/nodes/{escaped_node}/retry_policy"),
2199 "object",
2200 false,
2201 &[],
2202 ),
2203 ];
2204 if let Some(capsule_id) = capsule_id {
2205 let escaped_capsule = json_pointer_segment(capsule_id);
2206 fields.extend([
2207 editable_field(
2208 format!("prompt_capsule.{capsule_id}.prompt"),
2209 "Prompt capsule",
2210 format!("/prompt_capsules/{escaped_capsule}/prompt"),
2211 "string",
2212 true,
2213 &[],
2214 ),
2215 editable_field(
2216 format!("prompt_capsule.{capsule_id}.system"),
2217 "Prompt capsule system",
2218 format!("/prompt_capsules/{escaped_capsule}/system"),
2219 "string",
2220 false,
2221 &[],
2222 ),
2223 editable_field(
2224 format!("prompt_capsule.{capsule_id}.context"),
2225 "Prompt capsule context",
2226 format!("/prompt_capsules/{escaped_capsule}/context"),
2227 "object",
2228 false,
2229 &[],
2230 ),
2231 editable_field(
2232 format!("prompt_capsule.{capsule_id}.trigger_id"),
2233 "Prompt capsule trigger",
2234 format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2235 "string",
2236 false,
2237 &[],
2238 ),
2239 ]);
2240 }
2241 fields
2242}
2243
2244fn connector_editable_fields(
2245 index: usize,
2246 connector: &ConnectorRequirement,
2247) -> Vec<WorkflowBundleEditableField> {
2248 let base = format!("/connectors/{index}");
2249 [
2250 ("id", "Connector id", "string", true),
2251 ("provider_id", "Provider id", "string", true),
2252 ("scopes", "Scopes", "list", false),
2253 ("setup_required", "Setup required", "bool", false),
2254 ("status_required", "Status required", "bool", false),
2255 ]
2256 .into_iter()
2257 .map(|(field, label, value_type, required)| {
2258 editable_field(
2259 format!("connector.{}.{}", connector.id, field),
2260 label,
2261 format!("{base}/{field}"),
2262 value_type,
2263 required,
2264 &[],
2265 )
2266 })
2267 .collect()
2268}
2269
2270fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2271 vec![
2272 editable_field(
2273 "policy.retry.max_attempts",
2274 "Retry attempts",
2275 "/policy/retry/max_attempts",
2276 "integer",
2277 true,
2278 &[],
2279 ),
2280 editable_field(
2281 "policy.retry.backoff",
2282 "Retry backoff",
2283 "/policy/retry/backoff",
2284 "string",
2285 true,
2286 &[],
2287 ),
2288 ]
2289}
2290
2291fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2292 vec![
2293 editable_field(
2294 "policy.catchup.mode",
2295 "Catchup mode",
2296 "/policy/catchup/mode",
2297 "enum",
2298 true,
2299 &["none", "latest", "all"],
2300 ),
2301 editable_field(
2302 "policy.catchup.max_events",
2303 "Catchup max events",
2304 "/policy/catchup/max_events",
2305 "integer",
2306 false,
2307 &[],
2308 ),
2309 ]
2310}
2311
2312fn render_workflow_bundle_mermaid(
2313 nodes: &[WorkflowBundleGraphNode],
2314 edges: &[WorkflowBundleGraphEdge],
2315) -> String {
2316 let mut lines = vec!["flowchart TD".to_string()];
2317 for node in nodes {
2318 lines.push(format!(
2319 " {}[\"{}\"]",
2320 mermaid_id(&node.id),
2321 mermaid_label(&format!("{}: {}", node.node_type, node.label))
2322 ));
2323 }
2324 for edge in edges {
2325 let label = edge
2326 .label
2327 .as_deref()
2328 .or(edge.branch.as_deref())
2329 .map(mermaid_label);
2330 match label {
2331 Some(label) if !label.is_empty() => lines.push(format!(
2332 " {} -->|{}| {}",
2333 mermaid_id(&edge.from),
2334 label,
2335 mermaid_id(&edge.to)
2336 )),
2337 _ => lines.push(format!(
2338 " {} --> {}",
2339 mermaid_id(&edge.from),
2340 mermaid_id(&edge.to)
2341 )),
2342 }
2343 }
2344 lines.join("\n")
2345}
2346
2347fn mermaid_id(value: &str) -> String {
2348 let digest = Sha256::digest(value.as_bytes());
2349 let suffix = digest
2350 .iter()
2351 .take(4)
2352 .map(|byte| format!("{byte:02x}"))
2353 .collect::<String>();
2354 let mut out = format!("n_{suffix}_");
2355 for ch in value.chars() {
2356 if ch.is_ascii_alphanumeric() {
2357 out.push(ch);
2358 } else {
2359 out.push('_');
2360 }
2361 }
2362 out
2363}
2364
2365fn mermaid_label(value: &str) -> String {
2366 value
2367 .replace('\\', "\\\\")
2368 .replace('"', "\\\"")
2369 .replace('\n', " ")
2370}
2371
2372fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2373 let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2374 for trigger in &bundle.triggers {
2375 if let Some(node_id) = trigger.node_id.as_ref() {
2376 by_node
2377 .entry(node_id.clone())
2378 .or_default()
2379 .push(trigger.id.clone());
2380 }
2381 }
2382 by_node
2383}
2384
2385fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2386 bundle
2387 .prompt_capsules
2388 .iter()
2389 .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2390 .collect()
2391}
2392
2393fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2394 let outgoing =
2395 graph
2396 .edges
2397 .iter()
2398 .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2399 acc.entry(edge.from.clone())
2400 .or_default()
2401 .push(edge.to.clone());
2402 acc
2403 });
2404 let mut seen = BTreeSet::new();
2405 let mut queue = VecDeque::from([graph.entry.clone()]);
2406 let mut order = Vec::new();
2407 while let Some(node_id) = queue.pop_front() {
2408 if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2409 continue;
2410 }
2411 order.push(node_id.clone());
2412 if let Some(next) = outgoing.get(&node_id) {
2413 let mut next = next.clone();
2414 next.sort();
2415 for child in next {
2416 queue.push_back(child);
2417 }
2418 }
2419 }
2420 order
2421}
2422
2423fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2424 let suffix = graph_digest
2425 .strip_prefix("sha256:")
2426 .unwrap_or(graph_digest)
2427 .chars()
2428 .take(12)
2429 .collect::<String>();
2430 format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2431}
2432
2433fn sanitize_id(value: &str) -> String {
2434 value
2435 .chars()
2436 .map(|ch| {
2437 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2438 ch
2439 } else {
2440 '_'
2441 }
2442 })
2443 .collect()
2444}
2445
2446fn push_error(
2447 report: &mut WorkflowBundleValidationReport,
2448 path: impl Into<String>,
2449 message: impl Into<String>,
2450 node_id: Option<String>,
2451) {
2452 report.errors.push(WorkflowBundleDiagnostic {
2453 severity: "error".to_string(),
2454 path: path.into(),
2455 message: message.into(),
2456 node_id,
2457 });
2458}
2459
2460fn push_warning(
2461 report: &mut WorkflowBundleValidationReport,
2462 path: impl Into<String>,
2463 message: impl Into<String>,
2464 node_id: Option<String>,
2465) {
2466 report.warnings.push(WorkflowBundleDiagnostic {
2467 severity: "warning".to_string(),
2468 path: path.into(),
2469 message: message.into(),
2470 node_id,
2471 });
2472}
2473
2474#[cfg(test)]
2475mod tests;