1use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22const MAX_HARNPACK_DECOMPRESSED_BYTES: u64 = 100 * 1024 * 1024;
28
29fn decompress_harnpack_zstd(bytes: &[u8]) -> Result<Vec<u8>, WorkflowBundleError> {
34 let mut decoder = zstd::stream::Decoder::new(Cursor::new(bytes))?;
35 let mut output = Vec::new();
36 let mut limited = (&mut decoder).take(MAX_HARNPACK_DECOMPRESSED_BYTES.saturating_add(1));
37 limited.read_to_end(&mut output)?;
38 if output.len() as u64 > MAX_HARNPACK_DECOMPRESSED_BYTES {
39 return Err(WorkflowBundleError::new(
40 WorkflowBundleErrorKind::InvalidArchive,
41 format!(
42 "harnpack zstd payload decompressed past max size ({} bytes); refusing to extract",
43 MAX_HARNPACK_DECOMPRESSED_BYTES
44 ),
45 ));
46 }
47 Ok(output)
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
51#[serde(default)]
52pub struct WorkflowBundle {
53 pub schema_version: u32,
54 pub entrypoint: PathBuf,
55 pub transitive_modules: Vec<ModuleEntry>,
56 pub stdlib_version: String,
57 pub harn_version: String,
58 pub provider_catalog_hash: String,
59 pub tool_manifest: Vec<ToolEntry>,
60 pub sbom: SBOMDoc,
61 pub signature: Option<Ed25519Signature>,
62 pub parent_trust_record_id: Option<String>,
63 pub id: String,
64 pub name: Option<String>,
65 pub version: String,
66 pub triggers: Vec<WorkflowBundleTrigger>,
67 pub workflow: WorkflowGraph,
68 pub prompt_capsules: BTreeMap<String, PromptCapsule>,
69 pub policy: WorkflowBundlePolicy,
70 pub connectors: Vec<ConnectorRequirement>,
71 pub environment: EnvironmentRequirements,
72 pub receipts: WorkflowBundleReplayMetadata,
73 pub metadata: BTreeMap<String, serde_json::Value>,
74}
75
76impl Default for WorkflowBundle {
77 fn default() -> Self {
78 Self {
79 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
80 entrypoint: PathBuf::new(),
81 transitive_modules: Vec::new(),
82 stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
83 harn_version: env!("CARGO_PKG_VERSION").to_string(),
84 provider_catalog_hash: String::new(),
85 tool_manifest: Vec::new(),
86 sbom: SBOMDoc::default(),
87 signature: None,
88 parent_trust_record_id: None,
89 id: String::new(),
90 name: None,
91 version: String::new(),
92 triggers: Vec::new(),
93 workflow: WorkflowGraph::default(),
94 prompt_capsules: BTreeMap::new(),
95 policy: WorkflowBundlePolicy::default(),
96 connectors: Vec::new(),
97 environment: EnvironmentRequirements::default(),
98 receipts: WorkflowBundleReplayMetadata::default(),
99 metadata: BTreeMap::new(),
100 }
101 }
102}
103
104#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
105#[serde(default)]
106pub struct ModuleEntry {
107 pub path: PathBuf,
108 pub source_hash_blake3: String,
109 pub harnbc_hash_blake3: String,
110}
111
112#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
113#[serde(default)]
114pub struct ToolEntry {
115 pub name: String,
116 pub provider: Option<String>,
117 pub annotations: Option<ToolAnnotations>,
118 pub schema_hash_blake3: Option<String>,
119 pub metadata: BTreeMap<String, String>,
120}
121
122#[allow(clippy::upper_case_acronyms)]
123#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct SBOMDoc {
126 pub format: String,
127 pub version: String,
128 pub packages: Vec<SBOMPackage>,
129 pub relationships: Vec<SBOMRelationship>,
130}
131
132#[allow(clippy::upper_case_acronyms)]
133#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
134#[serde(default)]
135pub struct SBOMPackage {
136 pub name: String,
137 pub version: Option<String>,
138 pub package_hash_blake3: Option<String>,
139 pub license: Option<String>,
140}
141
142#[allow(clippy::upper_case_acronyms)]
143#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
144#[serde(default)]
145pub struct SBOMRelationship {
146 pub from: String,
147 pub to: String,
148 pub relationship_type: String,
149}
150
151#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
152#[serde(default)]
153pub struct Ed25519Signature {
154 pub key_id: Option<String>,
155 pub public_key: String,
156 pub signature: String,
157 pub manifest_hash_blake3: String,
158 pub algorithm: String,
159}
160
161#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(default)]
163pub struct WorkflowBundleTrigger {
164 pub id: String,
165 pub kind: String,
166 pub provider: Option<String>,
167 pub events: Vec<String>,
168 pub schedule: Option<String>,
169 pub delay: Option<String>,
170 pub webhook_path: Option<String>,
171 pub mcp_tool: Option<String>,
172 pub resume_key: Option<String>,
173 pub node_id: Option<String>,
174 pub metadata: BTreeMap<String, String>,
175}
176
177#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
178#[serde(default)]
179pub struct PromptCapsule {
180 pub id: String,
181 pub node_id: String,
182 pub trigger_id: Option<String>,
183 pub prompt: String,
184 pub system: Option<String>,
185 pub context: BTreeMap<String, serde_json::Value>,
186}
187
188#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
189#[serde(default)]
190pub struct WorkflowBundlePolicy {
191 pub autonomy_tier: String,
192 pub tool_policy: BTreeMap<String, serde_json::Value>,
193 pub approval_required: Vec<String>,
194 pub retry: RetryPolicySpec,
195 pub catchup: CatchupPolicySpec,
196}
197
198impl Default for WorkflowBundlePolicy {
199 fn default() -> Self {
200 Self {
201 autonomy_tier: "act_with_approval".to_string(),
202 tool_policy: BTreeMap::new(),
203 approval_required: Vec::new(),
204 retry: RetryPolicySpec::default(),
205 catchup: CatchupPolicySpec::default(),
206 }
207 }
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
211#[serde(default)]
212pub struct RetryPolicySpec {
213 pub max_attempts: u32,
214 pub backoff: String,
215}
216
217impl Default for RetryPolicySpec {
218 fn default() -> Self {
219 Self {
220 max_attempts: 1,
221 backoff: "none".to_string(),
222 }
223 }
224}
225
226#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
227#[serde(default)]
228pub struct CatchupPolicySpec {
229 pub mode: String,
230 pub max_events: Option<u32>,
231}
232
233impl Default for CatchupPolicySpec {
234 fn default() -> Self {
235 Self {
236 mode: "latest".to_string(),
237 max_events: Some(1),
238 }
239 }
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
243#[serde(default)]
244pub struct ConnectorRequirement {
245 pub id: String,
246 pub provider_id: String,
247 pub scopes: Vec<String>,
248 pub setup_required: bool,
249 pub status_required: bool,
250}
251
252#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
253#[serde(default)]
254pub struct EnvironmentRequirements {
255 pub repo_setup_profile: Option<String>,
256 pub worktree_policy: String,
257 pub command_gates: Vec<String>,
258}
259
260impl Default for EnvironmentRequirements {
261 fn default() -> Self {
262 Self {
263 repo_setup_profile: None,
264 worktree_policy: "host_managed".to_string(),
265 command_gates: Vec::new(),
266 }
267 }
268}
269
270#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
271#[serde(default)]
272pub struct WorkflowBundleReplayMetadata {
273 pub run_id: Option<String>,
274 pub event_ids: Vec<String>,
275 pub workflow_version: Option<usize>,
276 pub graph_digest: Option<String>,
277}
278
279#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
280#[serde(default)]
281pub struct WorkflowBundleDiagnostic {
282 pub severity: String,
283 pub path: String,
284 pub message: String,
285 pub node_id: Option<String>,
286}
287
288#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
289#[serde(default)]
290pub struct WorkflowBundleValidationReport {
291 pub valid: bool,
292 pub bundle_id: String,
293 pub workflow_id: String,
294 pub graph_digest: String,
295 pub errors: Vec<WorkflowBundleDiagnostic>,
296 pub warnings: Vec<WorkflowBundleDiagnostic>,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
300pub struct WorkflowBundlePreview {
301 pub schema_version: u32,
302 pub bundle_id: String,
303 pub bundle_version: String,
304 pub workflow_id: String,
305 pub workflow_version: usize,
306 pub graph_digest: String,
307 pub validation: WorkflowBundleValidationReport,
308 pub graph: WorkflowBundleGraphExport,
309 pub mermaid: String,
310 pub triggers: Vec<WorkflowBundleTrigger>,
311 pub connectors: Vec<ConnectorRequirement>,
312 pub environment: EnvironmentRequirements,
313 pub nodes: Vec<WorkflowBundlePreviewNode>,
314 pub edges: Vec<WorkflowEdge>,
315}
316
317#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
318pub struct WorkflowBundlePreviewNode {
319 pub id: String,
320 pub kind: String,
321 pub label: Option<String>,
322 pub prompt_capsule: Option<String>,
323 pub trigger_ids: Vec<String>,
324 pub outgoing: Vec<String>,
325}
326
327#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
328pub struct WorkflowBundleGraphExport {
329 pub schema_version: u32,
330 pub graph_id: String,
331 pub graph_digest: String,
332 pub nodes: Vec<WorkflowBundleGraphNode>,
333 pub edges: Vec<WorkflowBundleGraphEdge>,
334 pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
335 pub editable_fields: Vec<WorkflowBundleEditableField>,
336 pub mermaid: String,
337}
338
339#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
340pub struct WorkflowBundleGraphNode {
341 pub id: String,
342 pub node_type: String,
343 pub label: String,
344 pub workflow_node_id: Option<String>,
345 pub trigger_id: Option<String>,
346 pub connector_id: Option<String>,
347 pub editable_fields: Vec<WorkflowBundleEditableField>,
348 pub metadata: BTreeMap<String, serde_json::Value>,
349}
350
351#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
352pub struct WorkflowBundleGraphEdge {
353 pub from: String,
354 pub to: String,
355 pub label: Option<String>,
356 pub branch: Option<String>,
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
360pub struct WorkflowBundleGraphDiagnostic {
361 pub severity: String,
362 pub path: String,
363 pub message: String,
364 pub node_id: Option<String>,
365 pub graph_node_id: Option<String>,
366}
367
368#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
369pub struct WorkflowBundleEditableField {
370 pub id: String,
371 pub label: String,
372 pub json_pointer: String,
373 pub value_type: String,
374 pub required: bool,
375 pub enum_values: Vec<String>,
376}
377
378#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
379#[serde(default)]
380pub struct WorkflowBundleRunRequest {
381 pub trigger_id: Option<String>,
382 pub event_id: Option<String>,
383}
384
385#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
386pub struct WorkflowBundleRunReceipt {
387 pub schema_version: u32,
388 pub receipt_type: String,
389 pub bundle_id: String,
390 pub bundle_version: String,
391 pub workflow_id: String,
392 pub workflow_version: usize,
393 pub graph_digest: String,
394 pub run_id: String,
395 pub trigger_id: Option<String>,
396 pub event_ids: Vec<String>,
397 pub status: String,
398 pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
399 pub policy: WorkflowBundlePolicy,
400 pub connectors: Vec<ConnectorRequirement>,
401 pub environment: EnvironmentRequirements,
402}
403
404#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
405pub struct WorkflowBundleRunNodeReceipt {
406 pub node_id: String,
407 pub kind: String,
408 pub prompt_capsule: Option<String>,
409 pub status: String,
410}
411
412#[derive(Clone, Debug, PartialEq, Eq)]
413pub struct HarnpackEntry {
414 pub path: PathBuf,
415 pub bytes: Vec<u8>,
416 pub mode: u32,
417}
418
419impl HarnpackEntry {
420 pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
421 Self {
422 path: path.into(),
423 bytes: bytes.into(),
424 mode: DEFAULT_HARNPACK_FILE_MODE,
425 }
426 }
427
428 pub fn with_mode(mut self, mode: u32) -> Self {
429 self.mode = mode;
430 self
431 }
432}
433
434#[derive(Clone, Debug, PartialEq)]
435pub struct HarnpackArchive {
436 pub manifest: WorkflowBundle,
437 pub contents: Vec<HarnpackEntry>,
438}
439
440#[derive(Clone, Debug, PartialEq, Eq)]
441pub enum WorkflowBundleErrorKind {
442 Io,
443 Json,
444 MissingSchemaVersion,
445 UnsupportedSchemaVersion { actual: u32, expected: u32 },
446 InvalidArchive,
447 DuplicateArchiveEntry,
448 UnsafeArchivePath,
449 InvalidSignature,
450}
451
452#[derive(Clone, Debug, PartialEq, Eq)]
453pub struct WorkflowBundleError {
454 pub kind: WorkflowBundleErrorKind,
455 pub message: String,
456}
457
458impl WorkflowBundleError {
459 fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
460 Self {
461 kind,
462 message: message.into(),
463 }
464 }
465
466 fn unsupported_schema_version(actual: u32) -> Self {
467 Self::new(
468 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
469 actual,
470 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
471 },
472 format!(
473 "unsupported workflow bundle schema_version {actual}; expected {}",
474 WORKFLOW_BUNDLE_SCHEMA_VERSION
475 ),
476 )
477 }
478}
479
480impl std::fmt::Display for WorkflowBundleError {
481 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482 self.message.fmt(f)
483 }
484}
485
486impl std::error::Error for WorkflowBundleError {}
487
488impl From<std::io::Error> for WorkflowBundleError {
489 fn from(error: std::io::Error) -> Self {
490 Self::new(WorkflowBundleErrorKind::Io, error.to_string())
491 }
492}
493
494impl From<serde_json::Error> for WorkflowBundleError {
495 fn from(error: serde_json::Error) -> Self {
496 Self::new(WorkflowBundleErrorKind::Json, error.to_string())
497 }
498}
499
500pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
501 let bytes = fs::read(path)?;
502 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
503 read_harnpack(&bytes).map(|archive| archive.manifest)
504 } else {
505 parse_workflow_bundle_manifest(&bytes)
506 }
507}
508
509pub fn read_workflow_bundle_manifest_any_version(
515 bytes: &[u8],
516) -> Result<WorkflowBundle, WorkflowBundleError> {
517 let value: serde_json::Value = serde_json::from_slice(bytes)?;
518 if value.get("schema_version").is_none() {
519 return Err(WorkflowBundleError::new(
520 WorkflowBundleErrorKind::MissingSchemaVersion,
521 "workflow bundle manifest is missing schema_version",
522 ));
523 }
524 serde_json::from_value(value).map_err(Into::into)
525}
526
527pub fn load_workflow_bundle_any_version(
531 path: &Path,
532) -> Result<WorkflowBundle, WorkflowBundleError> {
533 let bytes = fs::read(path)?;
534 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
535 let tar_bytes = decompress_harnpack_zstd(&bytes)?;
536 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
537 for entry in archive.entries()? {
538 let mut entry = entry?;
539 if entry.header().entry_type().is_dir() {
540 continue;
541 }
542 let path = normalize_archive_path(entry.path()?.as_ref())?;
543 if path == HARNPACK_MANIFEST_PATH {
544 let mut entry_bytes = Vec::new();
545 entry.read_to_end(&mut entry_bytes)?;
546 return read_workflow_bundle_manifest_any_version(&entry_bytes);
547 }
548 }
549 Err(WorkflowBundleError::new(
550 WorkflowBundleErrorKind::InvalidArchive,
551 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
552 ))
553 } else {
554 read_workflow_bundle_manifest_any_version(&bytes)
555 }
556}
557
558pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
559 let value: serde_json::Value = serde_json::from_slice(bytes)?;
560 let schema_version = value
561 .get("schema_version")
562 .and_then(serde_json::Value::as_u64)
563 .ok_or_else(|| {
564 WorkflowBundleError::new(
565 WorkflowBundleErrorKind::MissingSchemaVersion,
566 "workflow bundle manifest is missing numeric schema_version",
567 )
568 })?;
569 let actual = u32::try_from(schema_version).map_err(|_| {
570 WorkflowBundleError::new(
571 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
572 actual: u32::MAX,
573 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
574 },
575 format!(
576 "unsupported workflow bundle schema_version {schema_version}; expected {}",
577 WORKFLOW_BUNDLE_SCHEMA_VERSION
578 ),
579 )
580 })?;
581 if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
582 return Err(WorkflowBundleError::unsupported_schema_version(actual));
583 }
584 serde_json::from_value(value).map_err(Into::into)
585}
586
587pub fn canonical_workflow_bundle_manifest_bytes(
588 bundle: &WorkflowBundle,
589) -> Result<Vec<u8>, WorkflowBundleError> {
590 serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
591}
592
593pub fn workflow_bundle_hash(
594 bundle: &WorkflowBundle,
595 contents: &[HarnpackEntry],
596) -> Result<String, WorkflowBundleError> {
597 let mut hasher = blake3::Hasher::new();
598 let mut canonical = canonical_workflow_bundle_manifest(bundle);
599 canonical.signature = None;
600 let manifest_bytes = serde_json::to_vec(&canonical)?;
601 hasher.update(&manifest_bytes);
602
603 let mut content_hashes = contents
604 .iter()
605 .map(|entry| blake3_hash_bytes(&entry.bytes))
606 .collect::<Vec<_>>();
607 content_hashes.sort();
608 for content_hash in content_hashes {
609 hasher.update(b"\n");
610 hasher.update(content_hash.as_bytes());
611 }
612
613 Ok(blake3_digest_string(hasher.finalize()))
614}
615
616pub fn verify_workflow_bundle_signature(
617 bundle: &WorkflowBundle,
618 contents: &[HarnpackEntry],
619) -> Result<(), WorkflowBundleError> {
620 let signature = bundle.signature.as_ref().ok_or_else(|| {
621 WorkflowBundleError::new(
622 WorkflowBundleErrorKind::InvalidSignature,
623 "workflow bundle is unsigned",
624 )
625 })?;
626 if signature.algorithm != "ed25519" {
627 return Err(WorkflowBundleError::new(
628 WorkflowBundleErrorKind::InvalidSignature,
629 format!(
630 "unsupported workflow bundle signature algorithm {}",
631 signature.algorithm
632 ),
633 ));
634 }
635 let expected_hash = workflow_bundle_hash(bundle, contents)?;
636 if signature.manifest_hash_blake3 != expected_hash {
637 return Err(WorkflowBundleError::new(
638 WorkflowBundleErrorKind::InvalidSignature,
639 format!(
640 "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
641 signature.manifest_hash_blake3
642 ),
643 ));
644 }
645 let public_key_bytes = decode_hex_exact::<32>(
646 "workflow bundle signature public_key",
647 &signature.public_key,
648 )?;
649 let signature_bytes =
650 decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
651 let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
652 WorkflowBundleError::new(
653 WorkflowBundleErrorKind::InvalidSignature,
654 format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
655 )
656 })?;
657 let ed25519_signature = Signature::from_bytes(&signature_bytes);
658 verifying_key
659 .verify(expected_hash.as_bytes(), &ed25519_signature)
660 .map_err(|error| {
661 WorkflowBundleError::new(
662 WorkflowBundleErrorKind::InvalidSignature,
663 format!("workflow bundle signature failed Ed25519 verification: {error}"),
664 )
665 })
666}
667
668pub fn build_harnpack(
669 bundle: &WorkflowBundle,
670 contents: &[HarnpackEntry],
671) -> Result<Vec<u8>, WorkflowBundleError> {
672 let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
673 let mut entries = contents
674 .iter()
675 .map(|entry| {
676 normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
677 })
678 .collect::<Result<Vec<_>, _>>()?;
679 entries.sort_by(|left, right| left.0.cmp(&right.0));
680
681 let mut seen = BTreeSet::new();
682 for (path, _, _) in &entries {
683 if path == HARNPACK_MANIFEST_PATH {
684 return Err(WorkflowBundleError::new(
685 WorkflowBundleErrorKind::DuplicateArchiveEntry,
686 format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
687 ));
688 }
689 if !seen.insert(path.clone()) {
690 return Err(WorkflowBundleError::new(
691 WorkflowBundleErrorKind::DuplicateArchiveEntry,
692 format!("duplicate archive entry: {path}"),
693 ));
694 }
695 }
696
697 let mut tar_bytes = Vec::new();
698 {
699 let mut builder = tar::Builder::new(&mut tar_bytes);
700 append_harnpack_entry(
701 &mut builder,
702 HARNPACK_MANIFEST_PATH,
703 &manifest_bytes,
704 DEFAULT_HARNPACK_FILE_MODE,
705 )?;
706 for (path, bytes, mode) in entries {
707 append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
708 }
709 builder.finish()?;
710 }
711
712 zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
713}
714
715pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
716 let tar_bytes = decompress_harnpack_zstd(bytes)?;
717 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
718 let mut manifest = None;
719 let mut contents = Vec::new();
720 let mut seen = BTreeSet::new();
721
722 for entry in archive.entries()? {
723 let mut entry = entry?;
724 if entry.header().entry_type().is_dir() {
725 continue;
726 }
727 let path = normalize_archive_path(entry.path()?.as_ref())?;
728 if !seen.insert(path.clone()) {
729 return Err(WorkflowBundleError::new(
730 WorkflowBundleErrorKind::DuplicateArchiveEntry,
731 format!("duplicate archive entry: {path}"),
732 ));
733 }
734 let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
735 let mut entry_bytes = Vec::new();
736 entry.read_to_end(&mut entry_bytes)?;
737
738 if path == HARNPACK_MANIFEST_PATH {
739 manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
740 } else {
741 contents.push(HarnpackEntry {
742 path: PathBuf::from(path),
743 bytes: entry_bytes,
744 mode,
745 });
746 }
747 }
748
749 contents.sort_by(|left, right| left.path.cmp(&right.path));
750 Ok(HarnpackArchive {
751 manifest: manifest.ok_or_else(|| {
752 WorkflowBundleError::new(
753 WorkflowBundleErrorKind::InvalidArchive,
754 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
755 )
756 })?,
757 contents,
758 })
759}
760
761pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
762 let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
763 Ok(blake3_hash_bytes(&bytes))
764}
765
766pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
767 let mut canonical = canonical_workflow_graph(graph);
768 canonical.audit_log.clear();
769 let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
770 let digest = Sha256::digest(bytes);
771 let hex = digest
772 .iter()
773 .map(|byte| format!("{byte:02x}"))
774 .collect::<String>();
775 format!("sha256:{hex}")
776}
777
778fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
779 let mut canonical = bundle.clone();
780 canonical.workflow = canonical_workflow_graph(&bundle.workflow);
781 canonical.transitive_modules.sort_by(|left, right| {
782 (
783 path_sort_key(&left.path),
784 &left.source_hash_blake3,
785 &left.harnbc_hash_blake3,
786 )
787 .cmp(&(
788 path_sort_key(&right.path),
789 &right.source_hash_blake3,
790 &right.harnbc_hash_blake3,
791 ))
792 });
793 canonical.tool_manifest.sort_by(|left, right| {
794 (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
795 &right.name,
796 &right.provider,
797 &right.schema_hash_blake3,
798 ))
799 });
800 canonical.sbom.packages.sort_by(|left, right| {
801 (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
802 &right.name,
803 &right.version,
804 &right.package_hash_blake3,
805 ))
806 });
807 canonical.sbom.relationships.sort_by(|left, right| {
808 (&left.from, &left.to, &left.relationship_type).cmp(&(
809 &right.from,
810 &right.to,
811 &right.relationship_type,
812 ))
813 });
814 canonical
815}
816
817fn decode_hex_exact<const N: usize>(
818 label: &str,
819 value: &str,
820) -> Result<[u8; N], WorkflowBundleError> {
821 let bytes = hex::decode(value).map_err(|error| {
822 WorkflowBundleError::new(
823 WorkflowBundleErrorKind::InvalidSignature,
824 format!("{label} is not hex: {error}"),
825 )
826 })?;
827 bytes.try_into().map_err(|bytes: Vec<u8>| {
828 WorkflowBundleError::new(
829 WorkflowBundleErrorKind::InvalidSignature,
830 format!("{label} must decode to {N} bytes, got {}", bytes.len()),
831 )
832 })
833}
834
835fn append_harnpack_entry<W: std::io::Write>(
836 builder: &mut tar::Builder<W>,
837 path: &str,
838 bytes: &[u8],
839 mode: u32,
840) -> Result<(), WorkflowBundleError> {
841 let mut header = tar::Header::new_gnu();
842 header.set_path(path).map_err(|error| {
843 WorkflowBundleError::new(
844 WorkflowBundleErrorKind::UnsafeArchivePath,
845 format!("invalid archive path {path}: {error}"),
846 )
847 })?;
848 header.set_entry_type(tar::EntryType::Regular);
849 header.set_size(bytes.len() as u64);
850 header.set_mode(mode);
851 header.set_mtime(0);
852 header.set_uid(0);
853 header.set_gid(0);
854 header.set_cksum();
855 builder.append(&header, bytes)?;
856 Ok(())
857}
858
859fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
860 let mut parts = Vec::new();
861 for component in path.components() {
862 match component {
863 Component::Normal(part) => {
864 let Some(part) = part.to_str() else {
865 return Err(WorkflowBundleError::new(
866 WorkflowBundleErrorKind::UnsafeArchivePath,
867 format!("archive path is not valid UTF-8: {}", path.display()),
868 ));
869 };
870 if part.is_empty() {
871 return Err(WorkflowBundleError::new(
872 WorkflowBundleErrorKind::UnsafeArchivePath,
873 "archive path contains an empty component",
874 ));
875 }
876 parts.push(part.to_string());
877 }
878 Component::CurDir => {}
879 Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
880 return Err(WorkflowBundleError::new(
881 WorkflowBundleErrorKind::UnsafeArchivePath,
882 format!(
883 "archive path must be relative and contained: {}",
884 path.display()
885 ),
886 ));
887 }
888 }
889 }
890 if parts.is_empty() {
891 return Err(WorkflowBundleError::new(
892 WorkflowBundleErrorKind::UnsafeArchivePath,
893 "archive path is empty",
894 ));
895 }
896 Ok(parts.join("/"))
897}
898
899fn path_sort_key(path: &Path) -> String {
900 path.components()
901 .filter_map(|component| match component {
902 Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
903 _ => None,
904 })
905 .collect::<Vec<_>>()
906 .join("/")
907}
908
909fn blake3_hash_bytes(bytes: &[u8]) -> String {
910 blake3_digest_string(blake3::hash(bytes))
911}
912
913fn blake3_digest_string(hash: blake3::Hash) -> String {
914 format!("blake3:{hash}")
915}
916
917pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
918 let canonical = canonical_workflow_graph(&bundle.workflow);
919 let mut report = WorkflowBundleValidationReport {
920 valid: true,
921 bundle_id: bundle.id.clone(),
922 workflow_id: canonical.id.clone(),
923 graph_digest: workflow_graph_digest(&canonical),
924 errors: Vec::new(),
925 warnings: Vec::new(),
926 };
927
928 validate_manifest_contract(bundle, &mut report);
929 validate_bundle_identity(bundle, &canonical, &mut report);
930 validate_triggers(bundle, &canonical, &mut report);
931 validate_prompt_capsules(bundle, &canonical, &mut report);
932 validate_policy(bundle, &mut report);
933 validate_connectors(bundle, &mut report);
934 validate_environment(bundle, &mut report);
935
936 let graph_report = validate_workflow(&canonical, None);
937 for error in graph_report.errors {
938 let node_id = workflow_diagnostic_node_id(&error, &canonical);
939 push_error(&mut report, "workflow", error, node_id);
940 }
941 for warning in graph_report.warnings {
942 let node_id = workflow_diagnostic_node_id(&warning, &canonical);
943 push_warning(&mut report, "workflow", warning, node_id);
944 }
945
946 if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
947 if expected != report.graph_digest {
948 let actual = report.graph_digest.clone();
949 push_error(
950 &mut report,
951 "receipts.graph_digest",
952 format!("graph digest mismatch: expected {expected}, computed {actual}"),
953 None,
954 );
955 }
956 }
957 if let Some(expected_version) = bundle.receipts.workflow_version {
958 if expected_version != canonical.version {
959 push_error(
960 &mut report,
961 "receipts.workflow_version",
962 format!(
963 "workflow version mismatch: expected {expected_version}, computed {}",
964 canonical.version
965 ),
966 None,
967 );
968 }
969 }
970
971 report.valid = report.errors.is_empty();
972 report
973}
974
975pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
976 let canonical = canonical_workflow_graph(&bundle.workflow);
977 let validation = validate_workflow_bundle(bundle);
978 let graph = export_workflow_bundle_graph(bundle, &validation);
979 let mermaid = graph.mermaid.clone();
980 let triggers_by_node = triggers_by_node(bundle);
981 let capsules_by_node = capsules_by_node(bundle);
982 let mut nodes = Vec::new();
983
984 for (node_id, node) in &canonical.nodes {
985 let mut outgoing = canonical
986 .edges
987 .iter()
988 .filter(|edge| edge.from == *node_id)
989 .map(|edge| edge.to.clone())
990 .collect::<Vec<_>>();
991 outgoing.sort();
992 outgoing.dedup();
993 nodes.push(WorkflowBundlePreviewNode {
994 id: node_id.clone(),
995 kind: node.kind.clone(),
996 label: node.task_label.clone(),
997 prompt_capsule: capsules_by_node.get(node_id).cloned(),
998 trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
999 outgoing,
1000 });
1001 }
1002
1003 WorkflowBundlePreview {
1004 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1005 bundle_id: bundle.id.clone(),
1006 bundle_version: bundle.version.clone(),
1007 workflow_id: canonical.id.clone(),
1008 workflow_version: canonical.version,
1009 graph_digest: validation.graph_digest.clone(),
1010 validation,
1011 graph,
1012 mermaid,
1013 triggers: bundle.triggers.clone(),
1014 connectors: bundle.connectors.clone(),
1015 environment: bundle.environment.clone(),
1016 nodes,
1017 edges: sorted_edges(&canonical),
1018 }
1019}
1020
1021pub fn export_workflow_bundle_graph(
1022 bundle: &WorkflowBundle,
1023 validation: &WorkflowBundleValidationReport,
1024) -> WorkflowBundleGraphExport {
1025 let canonical = canonical_workflow_graph(&bundle.workflow);
1026 let mut nodes = Vec::new();
1027 let mut edges = Vec::new();
1028 let mut editable_fields = Vec::new();
1029 let capsules_by_node = capsules_by_node(bundle);
1030 let catchup_enabled = bundle.policy.catchup.mode != "none";
1031 let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1032
1033 for (index, connector) in bundle.connectors.iter().enumerate() {
1034 let node_fields = connector_editable_fields(index, connector);
1035 editable_fields.extend(node_fields.clone());
1036 nodes.push(WorkflowBundleGraphNode {
1037 id: connector_graph_id(&connector.id),
1038 node_type: "connector_call".to_string(),
1039 label: connector_label(connector),
1040 workflow_node_id: None,
1041 trigger_id: None,
1042 connector_id: Some(connector.id.clone()),
1043 editable_fields: node_fields,
1044 metadata: BTreeMap::from([
1045 (
1046 "provider_id".to_string(),
1047 serde_json::json!(connector.provider_id),
1048 ),
1049 ("scopes".to_string(), serde_json::json!(connector.scopes)),
1050 ]),
1051 });
1052 }
1053
1054 let catchup_fields = catchup_editable_fields();
1055 let retry_fields = retry_editable_fields();
1056 if catchup_enabled {
1057 let node_fields = catchup_fields.clone();
1058 editable_fields.extend(node_fields.clone());
1059 nodes.push(WorkflowBundleGraphNode {
1060 id: catchup_graph_id(),
1061 node_type: "catchup".to_string(),
1062 label: "Catch up".to_string(),
1063 workflow_node_id: None,
1064 trigger_id: None,
1065 connector_id: None,
1066 editable_fields: node_fields,
1067 metadata: BTreeMap::from([(
1068 "mode".to_string(),
1069 serde_json::json!(bundle.policy.catchup.mode),
1070 )]),
1071 });
1072 } else {
1073 editable_fields.extend(catchup_fields);
1074 }
1075 if retry_can_dlq {
1076 let node_fields = retry_fields.clone();
1077 editable_fields.extend(node_fields.clone());
1078 nodes.push(WorkflowBundleGraphNode {
1079 id: dlq_graph_id(),
1080 node_type: "dlq".to_string(),
1081 label: "Dead letter queue".to_string(),
1082 workflow_node_id: None,
1083 trigger_id: None,
1084 connector_id: None,
1085 editable_fields: node_fields,
1086 metadata: BTreeMap::from([(
1087 "max_attempts".to_string(),
1088 serde_json::json!(bundle.policy.retry.max_attempts),
1089 )]),
1090 });
1091 } else {
1092 editable_fields.extend(retry_fields);
1093 }
1094
1095 for (index, trigger) in bundle.triggers.iter().enumerate() {
1096 let node_fields = trigger_editable_fields(index, trigger);
1097 editable_fields.extend(node_fields.clone());
1098 nodes.push(WorkflowBundleGraphNode {
1099 id: trigger_graph_id(&trigger.id),
1100 node_type: "trigger".to_string(),
1101 label: trigger_label(trigger),
1102 workflow_node_id: trigger.node_id.clone(),
1103 trigger_id: Some(trigger.id.clone()),
1104 connector_id: None,
1105 editable_fields: node_fields,
1106 metadata: BTreeMap::from([
1107 ("kind".to_string(), serde_json::json!(trigger.kind)),
1108 ("provider".to_string(), serde_json::json!(trigger.provider)),
1109 ("events".to_string(), serde_json::json!(trigger.events)),
1110 ]),
1111 });
1112 if let Some(provider) = trigger.provider.as_deref() {
1113 if let Some(connector) = bundle
1114 .connectors
1115 .iter()
1116 .find(|connector| connector.provider_id == provider || connector.id == provider)
1117 {
1118 edges.push(WorkflowBundleGraphEdge {
1119 from: connector_graph_id(&connector.id),
1120 to: trigger_graph_id(&trigger.id),
1121 label: Some("binds".to_string()),
1122 branch: None,
1123 });
1124 }
1125 }
1126 let target = trigger
1127 .node_id
1128 .clone()
1129 .unwrap_or_else(|| canonical.entry.clone());
1130 if catchup_enabled {
1131 edges.push(WorkflowBundleGraphEdge {
1132 from: trigger_graph_id(&trigger.id),
1133 to: catchup_graph_id(),
1134 label: Some(bundle.policy.catchup.mode.clone()),
1135 branch: Some("catchup".to_string()),
1136 });
1137 edges.push(WorkflowBundleGraphEdge {
1138 from: catchup_graph_id(),
1139 to: workflow_graph_id(&target),
1140 label: Some("dispatch".to_string()),
1141 branch: None,
1142 });
1143 } else {
1144 edges.push(WorkflowBundleGraphEdge {
1145 from: trigger_graph_id(&trigger.id),
1146 to: workflow_graph_id(&target),
1147 label: Some("dispatch".to_string()),
1148 branch: None,
1149 });
1150 }
1151 }
1152
1153 for (node_id, node) in &canonical.nodes {
1154 let capsule_id = capsules_by_node.get(node_id);
1155 let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1156 editable_fields.extend(node_fields.clone());
1157 nodes.push(WorkflowBundleGraphNode {
1158 id: workflow_graph_id(node_id),
1159 node_type: workflow_node_type(&node.kind),
1160 label: workflow_node_label(node_id, node),
1161 workflow_node_id: Some(node_id.clone()),
1162 trigger_id: None,
1163 connector_id: None,
1164 editable_fields: node_fields,
1165 metadata: BTreeMap::from([
1166 ("kind".to_string(), serde_json::json!(node.kind)),
1167 ("task_label".to_string(), serde_json::json!(node.task_label)),
1168 (
1169 "prompt_capsule".to_string(),
1170 serde_json::json!(capsule_id.cloned()),
1171 ),
1172 ]),
1173 });
1174 }
1175
1176 for edge in sorted_edges(&canonical) {
1177 edges.push(WorkflowBundleGraphEdge {
1178 from: workflow_graph_id(&edge.from),
1179 to: workflow_graph_id(&edge.to),
1180 label: edge.label.clone(),
1181 branch: edge.branch.clone(),
1182 });
1183 }
1184
1185 let outgoing: BTreeSet<&str> = canonical
1186 .edges
1187 .iter()
1188 .map(|edge| edge.from.as_str())
1189 .collect();
1190 for node_id in canonical.nodes.keys() {
1191 if !outgoing.contains(node_id.as_str()) {
1192 edges.push(WorkflowBundleGraphEdge {
1193 from: workflow_graph_id(node_id),
1194 to: terminal_completed_graph_id(),
1195 label: Some("completed".to_string()),
1196 branch: Some("completed".to_string()),
1197 });
1198 }
1199 if retry_can_dlq {
1200 edges.push(WorkflowBundleGraphEdge {
1201 from: workflow_graph_id(node_id),
1202 to: dlq_graph_id(),
1203 label: Some("retry exhausted".to_string()),
1204 branch: Some("failed".to_string()),
1205 });
1206 }
1207 }
1208
1209 nodes.push(WorkflowBundleGraphNode {
1210 id: terminal_completed_graph_id(),
1211 node_type: "terminal".to_string(),
1212 label: "Completed".to_string(),
1213 workflow_node_id: None,
1214 trigger_id: None,
1215 connector_id: None,
1216 editable_fields: Vec::new(),
1217 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1218 });
1219 nodes.push(WorkflowBundleGraphNode {
1220 id: terminal_failed_graph_id(),
1221 node_type: "terminal".to_string(),
1222 label: "Failed".to_string(),
1223 workflow_node_id: None,
1224 trigger_id: None,
1225 connector_id: None,
1226 editable_fields: Vec::new(),
1227 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1228 });
1229 if retry_can_dlq {
1230 edges.push(WorkflowBundleGraphEdge {
1231 from: dlq_graph_id(),
1232 to: terminal_failed_graph_id(),
1233 label: Some("failed".to_string()),
1234 branch: Some("failed".to_string()),
1235 });
1236 }
1237
1238 nodes.sort_by(|left, right| left.id.cmp(&right.id));
1239 edges.sort_by(|left, right| {
1240 (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1241 &right.from,
1242 &right.to,
1243 &right.branch,
1244 &right.label,
1245 ))
1246 });
1247 editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1248
1249 let diagnostics = validation
1250 .errors
1251 .iter()
1252 .chain(validation.warnings.iter())
1253 .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1254 severity: diagnostic.severity.clone(),
1255 path: diagnostic.path.clone(),
1256 message: diagnostic.message.clone(),
1257 node_id: diagnostic.node_id.clone(),
1258 graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1259 })
1260 .collect::<Vec<_>>();
1261 let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1262
1263 WorkflowBundleGraphExport {
1264 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1265 graph_id: canonical.id,
1266 graph_digest: validation.graph_digest.clone(),
1267 nodes,
1268 edges,
1269 diagnostics,
1270 editable_fields,
1271 mermaid,
1272 }
1273}
1274
1275pub fn run_workflow_bundle(
1276 bundle: &WorkflowBundle,
1277 request: WorkflowBundleRunRequest,
1278) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1279 let validation = validate_workflow_bundle(bundle);
1280 if !validation.valid {
1281 return Err(validation);
1282 }
1283
1284 let canonical = canonical_workflow_graph(&bundle.workflow);
1285 let trigger_id = match request.trigger_id {
1286 Some(trigger_id)
1287 if !bundle
1288 .triggers
1289 .iter()
1290 .any(|trigger| trigger.id == trigger_id) =>
1291 {
1292 let mut report = validation;
1293 push_error(
1294 &mut report,
1295 "trigger_id",
1296 format!("unknown trigger id: {trigger_id}"),
1297 None,
1298 );
1299 report.valid = false;
1300 return Err(report);
1301 }
1302 Some(trigger_id) => Some(trigger_id),
1303 None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1304 };
1305 let mut event_ids = bundle.receipts.event_ids.clone();
1306 if let Some(event_id) = request.event_id {
1307 if !event_ids.contains(&event_id) {
1308 event_ids.push(event_id);
1309 }
1310 }
1311 let run_id = bundle
1312 .receipts
1313 .run_id
1314 .clone()
1315 .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1316 let capsules_by_node = capsules_by_node(bundle);
1317 let executed_nodes = execution_order(&canonical)
1318 .into_iter()
1319 .map(|node_id| {
1320 let node = canonical
1321 .nodes
1322 .get(&node_id)
1323 .expect("execution order only contains known nodes");
1324 WorkflowBundleRunNodeReceipt {
1325 node_id: node_id.clone(),
1326 kind: node.kind.clone(),
1327 prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1328 status: "completed".to_string(),
1329 }
1330 })
1331 .collect();
1332
1333 Ok(WorkflowBundleRunReceipt {
1334 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1335 receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1336 bundle_id: bundle.id.clone(),
1337 bundle_version: bundle.version.clone(),
1338 workflow_id: canonical.id,
1339 workflow_version: canonical.version,
1340 graph_digest: validation.graph_digest,
1341 run_id,
1342 trigger_id,
1343 event_ids,
1344 status: "completed".to_string(),
1345 executed_nodes,
1346 policy: bundle.policy.clone(),
1347 connectors: bundle.connectors.clone(),
1348 environment: bundle.environment.clone(),
1349 })
1350}
1351
1352fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1353 let mut canonical = graph.clone();
1354 if canonical.type_name.is_empty() {
1355 canonical.type_name = "workflow_graph".to_string();
1356 }
1357 if canonical.version == 0 {
1358 canonical.version = 1;
1359 }
1360 if canonical.entry.is_empty() {
1361 canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1362 }
1363 for (node_id, node) in &mut canonical.nodes {
1364 if node.id.is_none() {
1365 node.id = Some(node_id.clone());
1366 }
1367 if node.kind.is_empty() {
1368 node.kind = "stage".to_string();
1369 }
1370 if node.retry_policy.max_attempts == 0 {
1371 node.retry_policy.max_attempts = 1;
1372 }
1373 }
1374 canonical.edges = sorted_edges(&canonical);
1375 canonical
1376}
1377
1378fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1379 let mut edges = graph.edges.clone();
1380 edges.sort_by(|left, right| {
1381 (
1382 &left.from,
1383 &left.to,
1384 left.branch.as_deref(),
1385 left.label.as_deref(),
1386 )
1387 .cmp(&(
1388 &right.from,
1389 &right.to,
1390 right.branch.as_deref(),
1391 right.label.as_deref(),
1392 ))
1393 });
1394 edges
1395}
1396
1397fn validate_bundle_identity(
1398 bundle: &WorkflowBundle,
1399 graph: &WorkflowGraph,
1400 report: &mut WorkflowBundleValidationReport,
1401) {
1402 if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1403 push_error(
1404 report,
1405 "schema_version",
1406 format!(
1407 "unsupported schema_version {}; expected {}",
1408 bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1409 ),
1410 None,
1411 );
1412 }
1413 if bundle.id.trim().is_empty() {
1414 push_error(report, "id", "bundle id is required", None);
1415 }
1416 if bundle.version.trim().is_empty() {
1417 push_error(report, "version", "bundle version is required", None);
1418 }
1419 if graph.id.trim().is_empty() {
1420 push_error(
1421 report,
1422 "workflow.id",
1423 "workflow id is required for portable bundles",
1424 None,
1425 );
1426 }
1427 if graph.nodes.is_empty() {
1428 push_error(
1429 report,
1430 "workflow.nodes",
1431 "workflow must contain nodes",
1432 None,
1433 );
1434 }
1435 for (node_id, node) in &graph.nodes {
1436 if node_id.trim().is_empty() {
1437 push_error(report, "workflow.nodes", "node id is required", None);
1438 }
1439 if node.id.as_deref().is_some_and(|id| id != node_id) {
1440 push_error(
1441 report,
1442 format!("workflow.nodes.{node_id}.id"),
1443 "node id field must match its map key",
1444 Some(node_id.clone()),
1445 );
1446 }
1447 }
1448}
1449
1450fn validate_manifest_contract(
1451 bundle: &WorkflowBundle,
1452 report: &mut WorkflowBundleValidationReport,
1453) {
1454 validate_relative_path(
1455 report,
1456 "entrypoint",
1457 &bundle.entrypoint,
1458 "entrypoint is required",
1459 );
1460 if bundle.transitive_modules.is_empty() {
1461 push_error(
1462 report,
1463 "transitive_modules",
1464 "at least one transitive module entry is required",
1465 None,
1466 );
1467 }
1468 let mut module_paths = BTreeSet::new();
1469 for (index, module) in bundle.transitive_modules.iter().enumerate() {
1470 let path = format!("transitive_modules[{index}]");
1471 validate_relative_path(
1472 report,
1473 format!("{path}.path"),
1474 &module.path,
1475 "module path is required",
1476 );
1477 if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1478 {
1479 push_error(
1480 report,
1481 format!("{path}.path"),
1482 format!(
1483 "duplicate transitive module path: {}",
1484 module.path.display()
1485 ),
1486 None,
1487 );
1488 }
1489 validate_blake3_hash(
1490 report,
1491 format!("{path}.source_hash_blake3"),
1492 &module.source_hash_blake3,
1493 true,
1494 );
1495 validate_blake3_hash(
1496 report,
1497 format!("{path}.harnbc_hash_blake3"),
1498 &module.harnbc_hash_blake3,
1499 true,
1500 );
1501 }
1502 if bundle.stdlib_version.trim().is_empty() {
1503 push_error(report, "stdlib_version", "stdlib_version is required", None);
1504 }
1505 if bundle.harn_version.trim().is_empty() {
1506 push_error(report, "harn_version", "harn_version is required", None);
1507 }
1508 validate_blake3_hash(
1509 report,
1510 "provider_catalog_hash",
1511 &bundle.provider_catalog_hash,
1512 true,
1513 );
1514
1515 let mut tool_names = BTreeSet::new();
1516 for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1517 let path = format!("tool_manifest[{index}]");
1518 if tool.name.trim().is_empty() {
1519 push_error(
1520 report,
1521 format!("{path}.name"),
1522 "tool manifest entry name is required",
1523 None,
1524 );
1525 } else if !tool_names.insert(tool.name.clone()) {
1526 push_error(
1527 report,
1528 format!("{path}.name"),
1529 format!("duplicate tool manifest entry: {}", tool.name),
1530 None,
1531 );
1532 }
1533 if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1534 validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1535 }
1536 }
1537
1538 if bundle.sbom.format.trim().is_empty() {
1539 push_error(report, "sbom.format", "SBOM format is required", None);
1540 }
1541 if bundle.sbom.version.trim().is_empty() {
1542 push_error(report, "sbom.version", "SBOM version is required", None);
1543 }
1544 let mut sbom_packages = BTreeSet::new();
1545 for (index, package) in bundle.sbom.packages.iter().enumerate() {
1546 let path = format!("sbom.packages[{index}]");
1547 if package.name.trim().is_empty() {
1548 push_error(
1549 report,
1550 format!("{path}.name"),
1551 "SBOM package name is required",
1552 None,
1553 );
1554 } else if !sbom_packages.insert(package.name.clone()) {
1555 push_error(
1556 report,
1557 format!("{path}.name"),
1558 format!("duplicate SBOM package: {}", package.name),
1559 None,
1560 );
1561 }
1562 if let Some(hash) = package.package_hash_blake3.as_deref() {
1563 validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1564 }
1565 }
1566 for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1567 let path = format!("sbom.relationships[{index}]");
1568 if relationship.from.trim().is_empty() {
1569 push_error(
1570 report,
1571 format!("{path}.from"),
1572 "SBOM relationship source is required",
1573 None,
1574 );
1575 }
1576 if relationship.to.trim().is_empty() {
1577 push_error(
1578 report,
1579 format!("{path}.to"),
1580 "SBOM relationship target is required",
1581 None,
1582 );
1583 }
1584 if relationship.relationship_type.trim().is_empty() {
1585 push_error(
1586 report,
1587 format!("{path}.relationship_type"),
1588 "SBOM relationship type is required",
1589 None,
1590 );
1591 }
1592 }
1593
1594 if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1595 if parent_id.trim().is_empty() {
1596 push_error(
1597 report,
1598 "parent_trust_record_id",
1599 "parent_trust_record_id cannot be empty when present",
1600 None,
1601 );
1602 }
1603 }
1604 if let Some(signature) = &bundle.signature {
1605 if signature.algorithm.trim().is_empty() {
1606 push_error(
1607 report,
1608 "signature.algorithm",
1609 "signature algorithm is required",
1610 None,
1611 );
1612 } else if signature.algorithm != "ed25519" {
1613 push_error(
1614 report,
1615 "signature.algorithm",
1616 "signature algorithm must be ed25519",
1617 None,
1618 );
1619 }
1620 if signature.public_key.trim().is_empty() {
1621 push_error(
1622 report,
1623 "signature.public_key",
1624 "signature public_key is required",
1625 None,
1626 );
1627 }
1628 if signature.signature.trim().is_empty() {
1629 push_error(
1630 report,
1631 "signature.signature",
1632 "signature value is required",
1633 None,
1634 );
1635 }
1636 validate_blake3_hash(
1637 report,
1638 "signature.manifest_hash_blake3",
1639 &signature.manifest_hash_blake3,
1640 true,
1641 );
1642 }
1643}
1644
1645fn validate_relative_path(
1646 report: &mut WorkflowBundleValidationReport,
1647 path: impl Into<String>,
1648 value: &Path,
1649 empty_message: &str,
1650) {
1651 let path = path.into();
1652 if value.as_os_str().is_empty() {
1653 push_error(report, path, empty_message, None);
1654 return;
1655 }
1656 if normalize_archive_path(value).is_err() {
1657 push_error(
1658 report,
1659 path,
1660 format!("path must be relative and contained: {}", value.display()),
1661 None,
1662 );
1663 }
1664}
1665
1666fn validate_blake3_hash(
1667 report: &mut WorkflowBundleValidationReport,
1668 path: impl Into<String>,
1669 value: &str,
1670 required: bool,
1671) {
1672 let path = path.into();
1673 if value.trim().is_empty() {
1674 if required {
1675 push_error(report, path, "BLAKE3 hash is required", None);
1676 }
1677 return;
1678 }
1679 let Some(hex) = value.strip_prefix("blake3:") else {
1680 push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1681 return;
1682 };
1683 if hex.len() != 64
1684 || !hex
1685 .bytes()
1686 .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1687 {
1688 push_error(
1689 report,
1690 path,
1691 "BLAKE3 hash must contain 64 lowercase hex digits",
1692 None,
1693 );
1694 }
1695}
1696
1697fn validate_triggers(
1698 bundle: &WorkflowBundle,
1699 graph: &WorkflowGraph,
1700 report: &mut WorkflowBundleValidationReport,
1701) {
1702 if bundle.triggers.is_empty() {
1703 push_warning(report, "triggers", "bundle declares no triggers", None);
1704 }
1705 let mut ids = BTreeSet::new();
1706 for (index, trigger) in bundle.triggers.iter().enumerate() {
1707 let path = format!("triggers[{index}]");
1708 if trigger.id.trim().is_empty() {
1709 push_error(report, format!("{path}.id"), "trigger id is required", None);
1710 } else if !ids.insert(trigger.id.clone()) {
1711 push_error(
1712 report,
1713 format!("{path}.id"),
1714 format!("duplicate trigger id: {}", trigger.id),
1715 None,
1716 );
1717 }
1718 match trigger.kind.as_str() {
1719 "github" => {
1720 if trigger.provider.as_deref() != Some("github") {
1721 push_error(
1722 report,
1723 format!("{path}.provider"),
1724 "github triggers require provider=\"github\"",
1725 None,
1726 );
1727 }
1728 if trigger.events.is_empty() {
1729 push_error(
1730 report,
1731 format!("{path}.events"),
1732 "github triggers require at least one event",
1733 None,
1734 );
1735 }
1736 }
1737 "cron" if trigger.schedule.is_none() => push_error(
1738 report,
1739 format!("{path}.schedule"),
1740 "cron triggers require schedule",
1741 None,
1742 ),
1743 "delay" if trigger.delay.is_none() => push_error(
1744 report,
1745 format!("{path}.delay"),
1746 "delay triggers require delay",
1747 None,
1748 ),
1749 "webhook" if trigger.webhook_path.is_none() => push_error(
1750 report,
1751 format!("{path}.webhook_path"),
1752 "webhook triggers require webhook_path",
1753 None,
1754 ),
1755 "mcp" if trigger.mcp_tool.is_none() => push_error(
1756 report,
1757 format!("{path}.mcp_tool"),
1758 "mcp triggers require mcp_tool",
1759 None,
1760 ),
1761 "manual" => {}
1762 "" => push_error(
1763 report,
1764 format!("{path}.kind"),
1765 "trigger kind is required",
1766 None,
1767 ),
1768 other
1769 if !matches!(
1770 other,
1771 "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1772 ) =>
1773 {
1774 push_error(
1775 report,
1776 format!("{path}.kind"),
1777 format!("unsupported trigger kind: {other}"),
1778 None,
1779 );
1780 }
1781 _ => {}
1782 }
1783 if let Some(node_id) = trigger.node_id.as_deref() {
1784 if !graph.nodes.contains_key(node_id) {
1785 push_error(
1786 report,
1787 format!("{path}.node_id"),
1788 format!("trigger references unknown node: {node_id}"),
1789 Some(node_id.to_string()),
1790 );
1791 }
1792 }
1793 }
1794}
1795
1796fn validate_prompt_capsules(
1797 bundle: &WorkflowBundle,
1798 graph: &WorkflowGraph,
1799 report: &mut WorkflowBundleValidationReport,
1800) {
1801 let trigger_ids: BTreeSet<&str> = bundle
1802 .triggers
1803 .iter()
1804 .map(|trigger| trigger.id.as_str())
1805 .collect();
1806 let mut node_refs = BTreeSet::new();
1807 for (key, capsule) in &bundle.prompt_capsules {
1808 let path = format!("prompt_capsules.{key}");
1809 if capsule.id.trim().is_empty() {
1810 push_error(
1811 report,
1812 format!("{path}.id"),
1813 "prompt capsule id is required",
1814 None,
1815 );
1816 } else if capsule.id != *key {
1817 push_error(
1818 report,
1819 format!("{path}.id"),
1820 "prompt capsule id must match its map key",
1821 None,
1822 );
1823 }
1824 if capsule.prompt.trim().is_empty() {
1825 push_error(
1826 report,
1827 format!("{path}.prompt"),
1828 "prompt capsule prompt is required",
1829 Some(capsule.node_id.clone()),
1830 );
1831 }
1832 if !graph.nodes.contains_key(&capsule.node_id) {
1833 push_error(
1834 report,
1835 format!("{path}.node_id"),
1836 format!(
1837 "prompt capsule references unknown node: {}",
1838 capsule.node_id
1839 ),
1840 Some(capsule.node_id.clone()),
1841 );
1842 }
1843 if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1844 push_error(
1845 report,
1846 format!("{path}.node_id"),
1847 format!("multiple prompt capsules target node {}", capsule.node_id),
1848 Some(capsule.node_id.clone()),
1849 );
1850 }
1851 if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1852 if !trigger_ids.contains(trigger_id) {
1853 push_error(
1854 report,
1855 format!("{path}.trigger_id"),
1856 format!("prompt capsule references unknown trigger: {trigger_id}"),
1857 Some(capsule.node_id.clone()),
1858 );
1859 }
1860 }
1861 }
1862}
1863
1864fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1865 if !matches!(
1866 bundle.policy.autonomy_tier.as_str(),
1867 "shadow" | "suggest" | "act_with_approval" | "act_auto"
1868 ) {
1869 push_error(
1870 report,
1871 "policy.autonomy_tier",
1872 "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1873 None,
1874 );
1875 }
1876 if bundle.policy.retry.max_attempts == 0 {
1877 push_error(
1878 report,
1879 "policy.retry.max_attempts",
1880 "retry.max_attempts must be at least 1",
1881 None,
1882 );
1883 }
1884 if !matches!(
1885 bundle.policy.catchup.mode.as_str(),
1886 "none" | "latest" | "all"
1887 ) {
1888 push_error(
1889 report,
1890 "policy.catchup.mode",
1891 "catchup.mode must be none, latest, or all",
1892 None,
1893 );
1894 }
1895}
1896
1897fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1898 let mut ids = BTreeSet::new();
1899 let provider_ids: BTreeSet<&str> = bundle
1900 .connectors
1901 .iter()
1902 .map(|connector| connector.provider_id.as_str())
1903 .collect();
1904 for (index, connector) in bundle.connectors.iter().enumerate() {
1905 let path = format!("connectors[{index}]");
1906 if connector.id.trim().is_empty() {
1907 push_error(
1908 report,
1909 format!("{path}.id"),
1910 "connector id is required",
1911 None,
1912 );
1913 } else if !ids.insert(connector.id.clone()) {
1914 push_error(
1915 report,
1916 format!("{path}.id"),
1917 format!("duplicate connector id: {}", connector.id),
1918 None,
1919 );
1920 }
1921 if connector.provider_id.trim().is_empty() {
1922 push_error(
1923 report,
1924 format!("{path}.provider_id"),
1925 "connector provider_id is required",
1926 None,
1927 );
1928 }
1929 }
1930 for trigger in &bundle.triggers {
1931 if let Some(provider) = trigger.provider.as_deref() {
1932 if !provider_ids.contains(provider) {
1933 push_warning(
1934 report,
1935 "connectors",
1936 format!(
1937 "trigger {} references provider {provider} with no connector requirement",
1938 trigger.id
1939 ),
1940 trigger.node_id.clone(),
1941 );
1942 }
1943 }
1944 }
1945}
1946
1947fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1948 if !matches!(
1949 bundle.environment.worktree_policy.as_str(),
1950 "reuse_current" | "new_worktree" | "host_managed"
1951 ) {
1952 push_error(
1953 report,
1954 "environment.worktree_policy",
1955 "worktree_policy must be reuse_current, new_worktree, or host_managed",
1956 None,
1957 );
1958 }
1959}
1960
1961fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1962 for prefix in [
1963 "node is unreachable: ",
1964 "edge.from references unknown node: ",
1965 "edge.to references unknown node: ",
1966 "entry node does not exist: ",
1967 ] {
1968 if let Some(node_id) = message.strip_prefix(prefix) {
1969 return Some(node_id.to_string());
1970 }
1971 }
1972 if let Some(rest) = message.strip_prefix("node ") {
1973 if let Some((node_id, _)) = rest.split_once(':') {
1974 return Some(node_id.to_string());
1975 }
1976 }
1977 graph
1978 .nodes
1979 .keys()
1980 .find(|node_id| message.contains(&format!("node {node_id}:")))
1981 .cloned()
1982}
1983
1984fn workflow_graph_id(node_id: &str) -> String {
1985 format!("node/{node_id}")
1986}
1987
1988fn trigger_graph_id(trigger_id: &str) -> String {
1989 format!("trigger/{trigger_id}")
1990}
1991
1992fn connector_graph_id(connector_id: &str) -> String {
1993 format!("connector/{connector_id}")
1994}
1995
1996fn catchup_graph_id() -> String {
1997 "policy/catchup".to_string()
1998}
1999
2000fn dlq_graph_id() -> String {
2001 "policy/dlq".to_string()
2002}
2003
2004fn terminal_completed_graph_id() -> String {
2005 "terminal/completed".to_string()
2006}
2007
2008fn terminal_failed_graph_id() -> String {
2009 "terminal/failed".to_string()
2010}
2011
2012fn workflow_node_type(kind: &str) -> String {
2013 match kind {
2014 "action" => "action",
2015 "stage" | "agent" => "agent",
2016 "subagent" | "worker" => "subagent",
2017 "wait" | "waitpoint" | "delay" => "wait",
2018 "approval" | "hitl" => "approval",
2019 "connector" | "connector_call" => "connector_call",
2020 "notification" | "notify" => "notification",
2021 "terminal" | "success" | "failure" => "terminal",
2022 other if other.trim().is_empty() => "agent",
2023 other => other,
2024 }
2025 .to_string()
2026}
2027
2028fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2029 node.task_label
2030 .clone()
2031 .or_else(|| node.prompt.clone())
2032 .map(|label| label.trim().to_string())
2033 .filter(|label| !label.is_empty())
2034 .unwrap_or_else(|| node_id.to_string())
2035}
2036
2037fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2038 if !trigger.events.is_empty() {
2039 format!("{}: {}", trigger.kind, trigger.events.join(", "))
2040 } else if let Some(schedule) = trigger.schedule.as_deref() {
2041 format!("cron: {schedule}")
2042 } else if let Some(delay) = trigger.delay.as_deref() {
2043 format!("delay: {delay}")
2044 } else {
2045 trigger.id.clone()
2046 }
2047}
2048
2049fn connector_label(connector: &ConnectorRequirement) -> String {
2050 if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2051 connector.id.clone()
2052 } else {
2053 format!("{} ({})", connector.id, connector.provider_id)
2054 }
2055}
2056
2057fn editable_field(
2058 id: impl Into<String>,
2059 label: impl Into<String>,
2060 json_pointer: impl Into<String>,
2061 value_type: impl Into<String>,
2062 required: bool,
2063 enum_values: &[&str],
2064) -> WorkflowBundleEditableField {
2065 WorkflowBundleEditableField {
2066 id: id.into(),
2067 label: label.into(),
2068 json_pointer: json_pointer.into(),
2069 value_type: value_type.into(),
2070 required,
2071 enum_values: enum_values
2072 .iter()
2073 .map(|value| (*value).to_string())
2074 .collect(),
2075 }
2076}
2077
2078fn json_pointer_segment(value: &str) -> String {
2079 value.replace('~', "~0").replace('/', "~1")
2080}
2081
2082fn trigger_editable_fields(
2083 index: usize,
2084 trigger: &WorkflowBundleTrigger,
2085) -> Vec<WorkflowBundleEditableField> {
2086 let base = format!("/triggers/{index}");
2087 let mut fields = vec![
2088 editable_field(
2089 format!("trigger.{}.kind", trigger.id),
2090 "Trigger kind",
2091 format!("{base}/kind"),
2092 "enum",
2093 true,
2094 &["github", "cron", "delay", "manual", "webhook", "mcp"],
2095 ),
2096 editable_field(
2097 format!("trigger.{}.node_id", trigger.id),
2098 "Target node",
2099 format!("{base}/node_id"),
2100 "string",
2101 false,
2102 &[],
2103 ),
2104 ];
2105 if trigger.provider.is_some() || trigger.kind == "github" {
2106 fields.push(editable_field(
2107 format!("trigger.{}.provider", trigger.id),
2108 "Provider",
2109 format!("{base}/provider"),
2110 "string",
2111 trigger.kind == "github",
2112 &[],
2113 ));
2114 }
2115 for (field, label, value_type) in [
2116 ("events", "Events", "list"),
2117 ("schedule", "Schedule", "string"),
2118 ("delay", "Delay", "string"),
2119 ("webhook_path", "Webhook path", "string"),
2120 ("mcp_tool", "MCP tool", "string"),
2121 ("resume_key", "Resume key", "string"),
2122 ("metadata", "Metadata", "object"),
2123 ] {
2124 fields.push(editable_field(
2125 format!("trigger.{}.{}", trigger.id, field),
2126 label,
2127 format!("{base}/{field}"),
2128 value_type,
2129 false,
2130 &[],
2131 ));
2132 }
2133 fields
2134}
2135
2136fn workflow_node_editable_fields(
2137 node_id: &str,
2138 capsule_id: Option<&String>,
2139) -> Vec<WorkflowBundleEditableField> {
2140 let escaped_node = json_pointer_segment(node_id);
2141 let mut fields = vec![
2142 editable_field(
2143 format!("workflow.{node_id}.task_label"),
2144 "Task label",
2145 format!("/workflow/nodes/{escaped_node}/task_label"),
2146 "string",
2147 false,
2148 &[],
2149 ),
2150 editable_field(
2151 format!("workflow.{node_id}.prompt"),
2152 "Prompt",
2153 format!("/workflow/nodes/{escaped_node}/prompt"),
2154 "string",
2155 false,
2156 &[],
2157 ),
2158 editable_field(
2159 format!("workflow.{node_id}.system"),
2160 "System prompt",
2161 format!("/workflow/nodes/{escaped_node}/system"),
2162 "string",
2163 false,
2164 &[],
2165 ),
2166 editable_field(
2167 format!("workflow.{node_id}.model_policy"),
2168 "Model policy",
2169 format!("/workflow/nodes/{escaped_node}/model_policy"),
2170 "object",
2171 false,
2172 &[],
2173 ),
2174 editable_field(
2175 format!("workflow.{node_id}.tools"),
2176 "Tool policy",
2177 format!("/workflow/nodes/{escaped_node}/tools"),
2178 "any",
2179 false,
2180 &[],
2181 ),
2182 editable_field(
2183 format!("workflow.{node_id}.capability_policy"),
2184 "Capability policy",
2185 format!("/workflow/nodes/{escaped_node}/capability_policy"),
2186 "object",
2187 false,
2188 &[],
2189 ),
2190 editable_field(
2191 format!("workflow.{node_id}.approval_policy"),
2192 "Approval policy",
2193 format!("/workflow/nodes/{escaped_node}/approval_policy"),
2194 "object",
2195 false,
2196 &[],
2197 ),
2198 editable_field(
2199 format!("workflow.{node_id}.retry_policy"),
2200 "Retry policy",
2201 format!("/workflow/nodes/{escaped_node}/retry_policy"),
2202 "object",
2203 false,
2204 &[],
2205 ),
2206 ];
2207 if let Some(capsule_id) = capsule_id {
2208 let escaped_capsule = json_pointer_segment(capsule_id);
2209 fields.extend([
2210 editable_field(
2211 format!("prompt_capsule.{capsule_id}.prompt"),
2212 "Prompt capsule",
2213 format!("/prompt_capsules/{escaped_capsule}/prompt"),
2214 "string",
2215 true,
2216 &[],
2217 ),
2218 editable_field(
2219 format!("prompt_capsule.{capsule_id}.system"),
2220 "Prompt capsule system",
2221 format!("/prompt_capsules/{escaped_capsule}/system"),
2222 "string",
2223 false,
2224 &[],
2225 ),
2226 editable_field(
2227 format!("prompt_capsule.{capsule_id}.context"),
2228 "Prompt capsule context",
2229 format!("/prompt_capsules/{escaped_capsule}/context"),
2230 "object",
2231 false,
2232 &[],
2233 ),
2234 editable_field(
2235 format!("prompt_capsule.{capsule_id}.trigger_id"),
2236 "Prompt capsule trigger",
2237 format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2238 "string",
2239 false,
2240 &[],
2241 ),
2242 ]);
2243 }
2244 fields
2245}
2246
2247fn connector_editable_fields(
2248 index: usize,
2249 connector: &ConnectorRequirement,
2250) -> Vec<WorkflowBundleEditableField> {
2251 let base = format!("/connectors/{index}");
2252 [
2253 ("id", "Connector id", "string", true),
2254 ("provider_id", "Provider id", "string", true),
2255 ("scopes", "Scopes", "list", false),
2256 ("setup_required", "Setup required", "bool", false),
2257 ("status_required", "Status required", "bool", false),
2258 ]
2259 .into_iter()
2260 .map(|(field, label, value_type, required)| {
2261 editable_field(
2262 format!("connector.{}.{}", connector.id, field),
2263 label,
2264 format!("{base}/{field}"),
2265 value_type,
2266 required,
2267 &[],
2268 )
2269 })
2270 .collect()
2271}
2272
2273fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2274 vec![
2275 editable_field(
2276 "policy.retry.max_attempts",
2277 "Retry attempts",
2278 "/policy/retry/max_attempts",
2279 "integer",
2280 true,
2281 &[],
2282 ),
2283 editable_field(
2284 "policy.retry.backoff",
2285 "Retry backoff",
2286 "/policy/retry/backoff",
2287 "string",
2288 true,
2289 &[],
2290 ),
2291 ]
2292}
2293
2294fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2295 vec![
2296 editable_field(
2297 "policy.catchup.mode",
2298 "Catchup mode",
2299 "/policy/catchup/mode",
2300 "enum",
2301 true,
2302 &["none", "latest", "all"],
2303 ),
2304 editable_field(
2305 "policy.catchup.max_events",
2306 "Catchup max events",
2307 "/policy/catchup/max_events",
2308 "integer",
2309 false,
2310 &[],
2311 ),
2312 ]
2313}
2314
2315fn render_workflow_bundle_mermaid(
2316 nodes: &[WorkflowBundleGraphNode],
2317 edges: &[WorkflowBundleGraphEdge],
2318) -> String {
2319 let mut lines = vec!["flowchart TD".to_string()];
2320 for node in nodes {
2321 lines.push(format!(
2322 " {}[\"{}\"]",
2323 mermaid_id(&node.id),
2324 mermaid_label(&format!("{}: {}", node.node_type, node.label))
2325 ));
2326 }
2327 for edge in edges {
2328 let label = edge
2329 .label
2330 .as_deref()
2331 .or(edge.branch.as_deref())
2332 .map(mermaid_label);
2333 match label {
2334 Some(label) if !label.is_empty() => lines.push(format!(
2335 " {} -->|{}| {}",
2336 mermaid_id(&edge.from),
2337 label,
2338 mermaid_id(&edge.to)
2339 )),
2340 _ => lines.push(format!(
2341 " {} --> {}",
2342 mermaid_id(&edge.from),
2343 mermaid_id(&edge.to)
2344 )),
2345 }
2346 }
2347 lines.join("\n")
2348}
2349
2350fn mermaid_id(value: &str) -> String {
2351 let digest = Sha256::digest(value.as_bytes());
2352 let suffix = digest
2353 .iter()
2354 .take(4)
2355 .map(|byte| format!("{byte:02x}"))
2356 .collect::<String>();
2357 let mut out = format!("n_{suffix}_");
2358 for ch in value.chars() {
2359 if ch.is_ascii_alphanumeric() {
2360 out.push(ch);
2361 } else {
2362 out.push('_');
2363 }
2364 }
2365 out
2366}
2367
2368fn mermaid_label(value: &str) -> String {
2369 value
2370 .replace('\\', "\\\\")
2371 .replace('"', "\\\"")
2372 .replace('\n', " ")
2373}
2374
2375fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2376 let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2377 for trigger in &bundle.triggers {
2378 if let Some(node_id) = trigger.node_id.as_ref() {
2379 by_node
2380 .entry(node_id.clone())
2381 .or_default()
2382 .push(trigger.id.clone());
2383 }
2384 }
2385 by_node
2386}
2387
2388fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2389 bundle
2390 .prompt_capsules
2391 .iter()
2392 .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2393 .collect()
2394}
2395
2396fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2397 let outgoing =
2398 graph
2399 .edges
2400 .iter()
2401 .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2402 acc.entry(edge.from.clone())
2403 .or_default()
2404 .push(edge.to.clone());
2405 acc
2406 });
2407 let mut seen = BTreeSet::new();
2408 let mut queue = VecDeque::from([graph.entry.clone()]);
2409 let mut order = Vec::new();
2410 while let Some(node_id) = queue.pop_front() {
2411 if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2412 continue;
2413 }
2414 order.push(node_id.clone());
2415 if let Some(next) = outgoing.get(&node_id) {
2416 let mut next = next.clone();
2417 next.sort();
2418 for child in next {
2419 queue.push_back(child);
2420 }
2421 }
2422 }
2423 order
2424}
2425
2426fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2427 let suffix = graph_digest
2428 .strip_prefix("sha256:")
2429 .unwrap_or(graph_digest)
2430 .chars()
2431 .take(12)
2432 .collect::<String>();
2433 format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2434}
2435
2436fn sanitize_id(value: &str) -> String {
2437 value
2438 .chars()
2439 .map(|ch| {
2440 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2441 ch
2442 } else {
2443 '_'
2444 }
2445 })
2446 .collect()
2447}
2448
2449fn push_error(
2450 report: &mut WorkflowBundleValidationReport,
2451 path: impl Into<String>,
2452 message: impl Into<String>,
2453 node_id: Option<String>,
2454) {
2455 report.errors.push(WorkflowBundleDiagnostic {
2456 severity: "error".to_string(),
2457 path: path.into(),
2458 message: message.into(),
2459 node_id,
2460 });
2461}
2462
2463fn push_warning(
2464 report: &mut WorkflowBundleValidationReport,
2465 path: impl Into<String>,
2466 message: impl Into<String>,
2467 node_id: Option<String>,
2468) {
2469 report.warnings.push(WorkflowBundleDiagnostic {
2470 severity: "warning".to_string(),
2471 path: path.into(),
2472 message: message.into(),
2473 node_id,
2474 });
2475}
2476
2477#[cfg(test)]
2478#[path = "workflow_bundle_tests.rs"]
2479mod workflow_bundle_tests;