1use std::collections::{BTreeMap, BTreeSet, VecDeque};
5use std::fs;
6use std::io::{Cursor, Read};
7use std::path::{Component, Path, PathBuf};
8
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
14use crate::tool_annotations::ToolAnnotations;
15
16pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 2;
17pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
18pub const HARNPACK_MANIFEST_PATH: &str = "harnpack.json";
19
20const DEFAULT_HARNPACK_FILE_MODE: u32 = 0o644;
21
22#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
23#[serde(default)]
24pub struct WorkflowBundle {
25 pub schema_version: u32,
26 pub entrypoint: PathBuf,
27 pub transitive_modules: Vec<ModuleEntry>,
28 pub stdlib_version: String,
29 pub harn_version: String,
30 pub provider_catalog_hash: String,
31 pub tool_manifest: Vec<ToolEntry>,
32 pub sbom: SBOMDoc,
33 pub signature: Option<Ed25519Signature>,
34 pub parent_trust_record_id: Option<String>,
35 pub id: String,
36 pub name: Option<String>,
37 pub version: String,
38 pub triggers: Vec<WorkflowBundleTrigger>,
39 pub workflow: WorkflowGraph,
40 pub prompt_capsules: BTreeMap<String, PromptCapsule>,
41 pub policy: WorkflowBundlePolicy,
42 pub connectors: Vec<ConnectorRequirement>,
43 pub environment: EnvironmentRequirements,
44 pub receipts: WorkflowBundleReplayMetadata,
45 pub metadata: BTreeMap<String, serde_json::Value>,
46}
47
48impl Default for WorkflowBundle {
49 fn default() -> Self {
50 Self {
51 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
52 entrypoint: PathBuf::new(),
53 transitive_modules: Vec::new(),
54 stdlib_version: env!("CARGO_PKG_VERSION").to_string(),
55 harn_version: env!("CARGO_PKG_VERSION").to_string(),
56 provider_catalog_hash: String::new(),
57 tool_manifest: Vec::new(),
58 sbom: SBOMDoc::default(),
59 signature: None,
60 parent_trust_record_id: None,
61 id: String::new(),
62 name: None,
63 version: String::new(),
64 triggers: Vec::new(),
65 workflow: WorkflowGraph::default(),
66 prompt_capsules: BTreeMap::new(),
67 policy: WorkflowBundlePolicy::default(),
68 connectors: Vec::new(),
69 environment: EnvironmentRequirements::default(),
70 receipts: WorkflowBundleReplayMetadata::default(),
71 metadata: BTreeMap::new(),
72 }
73 }
74}
75
76#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
77#[serde(default)]
78pub struct ModuleEntry {
79 pub path: PathBuf,
80 pub source_hash_blake3: String,
81 pub harnbc_hash_blake3: String,
82}
83
84#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct ToolEntry {
87 pub name: String,
88 pub provider: Option<String>,
89 pub annotations: Option<ToolAnnotations>,
90 pub schema_hash_blake3: Option<String>,
91 pub metadata: BTreeMap<String, String>,
92}
93
94#[allow(clippy::upper_case_acronyms)]
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct SBOMDoc {
98 pub format: String,
99 pub version: String,
100 pub packages: Vec<SBOMPackage>,
101 pub relationships: Vec<SBOMRelationship>,
102}
103
104#[allow(clippy::upper_case_acronyms)]
105#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
106#[serde(default)]
107pub struct SBOMPackage {
108 pub name: String,
109 pub version: Option<String>,
110 pub package_hash_blake3: Option<String>,
111 pub license: Option<String>,
112}
113
114#[allow(clippy::upper_case_acronyms)]
115#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
116#[serde(default)]
117pub struct SBOMRelationship {
118 pub from: String,
119 pub to: String,
120 pub relationship_type: String,
121}
122
123#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct Ed25519Signature {
126 pub key_id: Option<String>,
127 pub public_key: String,
128 pub signature: String,
129 pub manifest_hash_blake3: String,
130 pub algorithm: String,
131}
132
133#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
134#[serde(default)]
135pub struct WorkflowBundleTrigger {
136 pub id: String,
137 pub kind: String,
138 pub provider: Option<String>,
139 pub events: Vec<String>,
140 pub schedule: Option<String>,
141 pub delay: Option<String>,
142 pub webhook_path: Option<String>,
143 pub mcp_tool: Option<String>,
144 pub resume_key: Option<String>,
145 pub node_id: Option<String>,
146 pub metadata: BTreeMap<String, String>,
147}
148
149#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
150#[serde(default)]
151pub struct PromptCapsule {
152 pub id: String,
153 pub node_id: String,
154 pub trigger_id: Option<String>,
155 pub prompt: String,
156 pub system: Option<String>,
157 pub context: BTreeMap<String, serde_json::Value>,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
161#[serde(default)]
162pub struct WorkflowBundlePolicy {
163 pub autonomy_tier: String,
164 pub tool_policy: BTreeMap<String, serde_json::Value>,
165 pub approval_required: Vec<String>,
166 pub retry: RetryPolicySpec,
167 pub catchup: CatchupPolicySpec,
168}
169
170impl Default for WorkflowBundlePolicy {
171 fn default() -> Self {
172 Self {
173 autonomy_tier: "act_with_approval".to_string(),
174 tool_policy: BTreeMap::new(),
175 approval_required: Vec::new(),
176 retry: RetryPolicySpec::default(),
177 catchup: CatchupPolicySpec::default(),
178 }
179 }
180}
181
182#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
183#[serde(default)]
184pub struct RetryPolicySpec {
185 pub max_attempts: u32,
186 pub backoff: String,
187}
188
189impl Default for RetryPolicySpec {
190 fn default() -> Self {
191 Self {
192 max_attempts: 1,
193 backoff: "none".to_string(),
194 }
195 }
196}
197
198#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
199#[serde(default)]
200pub struct CatchupPolicySpec {
201 pub mode: String,
202 pub max_events: Option<u32>,
203}
204
205impl Default for CatchupPolicySpec {
206 fn default() -> Self {
207 Self {
208 mode: "latest".to_string(),
209 max_events: Some(1),
210 }
211 }
212}
213
214#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
215#[serde(default)]
216pub struct ConnectorRequirement {
217 pub id: String,
218 pub provider_id: String,
219 pub scopes: Vec<String>,
220 pub setup_required: bool,
221 pub status_required: bool,
222}
223
224#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct EnvironmentRequirements {
227 pub repo_setup_profile: Option<String>,
228 pub worktree_policy: String,
229 pub command_gates: Vec<String>,
230}
231
232impl Default for EnvironmentRequirements {
233 fn default() -> Self {
234 Self {
235 repo_setup_profile: None,
236 worktree_policy: "host_managed".to_string(),
237 command_gates: Vec::new(),
238 }
239 }
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
243#[serde(default)]
244pub struct WorkflowBundleReplayMetadata {
245 pub run_id: Option<String>,
246 pub event_ids: Vec<String>,
247 pub workflow_version: Option<usize>,
248 pub graph_digest: Option<String>,
249}
250
251#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct WorkflowBundleDiagnostic {
254 pub severity: String,
255 pub path: String,
256 pub message: String,
257 pub node_id: Option<String>,
258}
259
260#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(default)]
262pub struct WorkflowBundleValidationReport {
263 pub valid: bool,
264 pub bundle_id: String,
265 pub workflow_id: String,
266 pub graph_digest: String,
267 pub errors: Vec<WorkflowBundleDiagnostic>,
268 pub warnings: Vec<WorkflowBundleDiagnostic>,
269}
270
271#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
272pub struct WorkflowBundlePreview {
273 pub schema_version: u32,
274 pub bundle_id: String,
275 pub bundle_version: String,
276 pub workflow_id: String,
277 pub workflow_version: usize,
278 pub graph_digest: String,
279 pub validation: WorkflowBundleValidationReport,
280 pub graph: WorkflowBundleGraphExport,
281 pub mermaid: String,
282 pub triggers: Vec<WorkflowBundleTrigger>,
283 pub connectors: Vec<ConnectorRequirement>,
284 pub environment: EnvironmentRequirements,
285 pub nodes: Vec<WorkflowBundlePreviewNode>,
286 pub edges: Vec<WorkflowEdge>,
287}
288
289#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
290pub struct WorkflowBundlePreviewNode {
291 pub id: String,
292 pub kind: String,
293 pub label: Option<String>,
294 pub prompt_capsule: Option<String>,
295 pub trigger_ids: Vec<String>,
296 pub outgoing: Vec<String>,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
300pub struct WorkflowBundleGraphExport {
301 pub schema_version: u32,
302 pub graph_id: String,
303 pub graph_digest: String,
304 pub nodes: Vec<WorkflowBundleGraphNode>,
305 pub edges: Vec<WorkflowBundleGraphEdge>,
306 pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
307 pub editable_fields: Vec<WorkflowBundleEditableField>,
308 pub mermaid: String,
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
312pub struct WorkflowBundleGraphNode {
313 pub id: String,
314 pub node_type: String,
315 pub label: String,
316 pub workflow_node_id: Option<String>,
317 pub trigger_id: Option<String>,
318 pub connector_id: Option<String>,
319 pub editable_fields: Vec<WorkflowBundleEditableField>,
320 pub metadata: BTreeMap<String, serde_json::Value>,
321}
322
323#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
324pub struct WorkflowBundleGraphEdge {
325 pub from: String,
326 pub to: String,
327 pub label: Option<String>,
328 pub branch: Option<String>,
329}
330
331#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
332pub struct WorkflowBundleGraphDiagnostic {
333 pub severity: String,
334 pub path: String,
335 pub message: String,
336 pub node_id: Option<String>,
337 pub graph_node_id: Option<String>,
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
341pub struct WorkflowBundleEditableField {
342 pub id: String,
343 pub label: String,
344 pub json_pointer: String,
345 pub value_type: String,
346 pub required: bool,
347 pub enum_values: Vec<String>,
348}
349
350#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
351#[serde(default)]
352pub struct WorkflowBundleRunRequest {
353 pub trigger_id: Option<String>,
354 pub event_id: Option<String>,
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358pub struct WorkflowBundleRunReceipt {
359 pub schema_version: u32,
360 pub receipt_type: String,
361 pub bundle_id: String,
362 pub bundle_version: String,
363 pub workflow_id: String,
364 pub workflow_version: usize,
365 pub graph_digest: String,
366 pub run_id: String,
367 pub trigger_id: Option<String>,
368 pub event_ids: Vec<String>,
369 pub status: String,
370 pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
371 pub policy: WorkflowBundlePolicy,
372 pub connectors: Vec<ConnectorRequirement>,
373 pub environment: EnvironmentRequirements,
374}
375
376#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
377pub struct WorkflowBundleRunNodeReceipt {
378 pub node_id: String,
379 pub kind: String,
380 pub prompt_capsule: Option<String>,
381 pub status: String,
382}
383
384#[derive(Clone, Debug, PartialEq, Eq)]
385pub struct HarnpackEntry {
386 pub path: PathBuf,
387 pub bytes: Vec<u8>,
388 pub mode: u32,
389}
390
391impl HarnpackEntry {
392 pub fn new(path: impl Into<PathBuf>, bytes: impl Into<Vec<u8>>) -> Self {
393 Self {
394 path: path.into(),
395 bytes: bytes.into(),
396 mode: DEFAULT_HARNPACK_FILE_MODE,
397 }
398 }
399
400 pub fn with_mode(mut self, mode: u32) -> Self {
401 self.mode = mode;
402 self
403 }
404}
405
406#[derive(Clone, Debug, PartialEq)]
407pub struct HarnpackArchive {
408 pub manifest: WorkflowBundle,
409 pub contents: Vec<HarnpackEntry>,
410}
411
412#[derive(Clone, Debug, PartialEq, Eq)]
413pub enum WorkflowBundleErrorKind {
414 Io,
415 Json,
416 MissingSchemaVersion,
417 UnsupportedSchemaVersion { actual: u32, expected: u32 },
418 InvalidArchive,
419 DuplicateArchiveEntry,
420 UnsafeArchivePath,
421 InvalidSignature,
422}
423
424#[derive(Clone, Debug, PartialEq, Eq)]
425pub struct WorkflowBundleError {
426 pub kind: WorkflowBundleErrorKind,
427 pub message: String,
428}
429
430impl WorkflowBundleError {
431 fn new(kind: WorkflowBundleErrorKind, message: impl Into<String>) -> Self {
432 Self {
433 kind,
434 message: message.into(),
435 }
436 }
437
438 fn unsupported_schema_version(actual: u32) -> Self {
439 Self::new(
440 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
441 actual,
442 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
443 },
444 format!(
445 "unsupported workflow bundle schema_version {actual}; expected {}",
446 WORKFLOW_BUNDLE_SCHEMA_VERSION
447 ),
448 )
449 }
450}
451
452impl std::fmt::Display for WorkflowBundleError {
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 self.message.fmt(f)
455 }
456}
457
458impl std::error::Error for WorkflowBundleError {}
459
460impl From<std::io::Error> for WorkflowBundleError {
461 fn from(error: std::io::Error) -> Self {
462 Self::new(WorkflowBundleErrorKind::Io, error.to_string())
463 }
464}
465
466impl From<serde_json::Error> for WorkflowBundleError {
467 fn from(error: serde_json::Error) -> Self {
468 Self::new(WorkflowBundleErrorKind::Json, error.to_string())
469 }
470}
471
472pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
473 let bytes = fs::read(path)?;
474 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
475 read_harnpack(&bytes).map(|archive| archive.manifest)
476 } else {
477 parse_workflow_bundle_manifest(&bytes)
478 }
479}
480
481pub fn read_workflow_bundle_manifest_any_version(
487 bytes: &[u8],
488) -> Result<WorkflowBundle, WorkflowBundleError> {
489 let value: serde_json::Value = serde_json::from_slice(bytes)?;
490 if value.get("schema_version").is_none() {
491 return Err(WorkflowBundleError::new(
492 WorkflowBundleErrorKind::MissingSchemaVersion,
493 "workflow bundle manifest is missing schema_version",
494 ));
495 }
496 serde_json::from_value(value).map_err(Into::into)
497}
498
499pub fn load_workflow_bundle_any_version(
503 path: &Path,
504) -> Result<WorkflowBundle, WorkflowBundleError> {
505 let bytes = fs::read(path)?;
506 if path.extension().and_then(|extension| extension.to_str()) == Some("harnpack") {
507 let tar_bytes = zstd::stream::decode_all(Cursor::new(bytes))?;
508 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
509 for entry in archive.entries()? {
510 let mut entry = entry?;
511 if entry.header().entry_type().is_dir() {
512 continue;
513 }
514 let path = normalize_archive_path(entry.path()?.as_ref())?;
515 if path == HARNPACK_MANIFEST_PATH {
516 let mut entry_bytes = Vec::new();
517 entry.read_to_end(&mut entry_bytes)?;
518 return read_workflow_bundle_manifest_any_version(&entry_bytes);
519 }
520 }
521 Err(WorkflowBundleError::new(
522 WorkflowBundleErrorKind::InvalidArchive,
523 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
524 ))
525 } else {
526 read_workflow_bundle_manifest_any_version(&bytes)
527 }
528}
529
530pub fn parse_workflow_bundle_manifest(bytes: &[u8]) -> Result<WorkflowBundle, WorkflowBundleError> {
531 let value: serde_json::Value = serde_json::from_slice(bytes)?;
532 let schema_version = value
533 .get("schema_version")
534 .and_then(serde_json::Value::as_u64)
535 .ok_or_else(|| {
536 WorkflowBundleError::new(
537 WorkflowBundleErrorKind::MissingSchemaVersion,
538 "workflow bundle manifest is missing numeric schema_version",
539 )
540 })?;
541 let actual = u32::try_from(schema_version).map_err(|_| {
542 WorkflowBundleError::new(
543 WorkflowBundleErrorKind::UnsupportedSchemaVersion {
544 actual: u32::MAX,
545 expected: WORKFLOW_BUNDLE_SCHEMA_VERSION,
546 },
547 format!(
548 "unsupported workflow bundle schema_version {schema_version}; expected {}",
549 WORKFLOW_BUNDLE_SCHEMA_VERSION
550 ),
551 )
552 })?;
553 if actual != WORKFLOW_BUNDLE_SCHEMA_VERSION {
554 return Err(WorkflowBundleError::unsupported_schema_version(actual));
555 }
556 serde_json::from_value(value).map_err(Into::into)
557}
558
559pub fn canonical_workflow_bundle_manifest_bytes(
560 bundle: &WorkflowBundle,
561) -> Result<Vec<u8>, WorkflowBundleError> {
562 serde_json::to_vec(&canonical_workflow_bundle_manifest(bundle)).map_err(Into::into)
563}
564
565pub fn workflow_bundle_hash(
566 bundle: &WorkflowBundle,
567 contents: &[HarnpackEntry],
568) -> Result<String, WorkflowBundleError> {
569 let mut hasher = blake3::Hasher::new();
570 let mut canonical = canonical_workflow_bundle_manifest(bundle);
571 canonical.signature = None;
572 let manifest_bytes = serde_json::to_vec(&canonical)?;
573 hasher.update(&manifest_bytes);
574
575 let mut content_hashes = contents
576 .iter()
577 .map(|entry| blake3_hash_bytes(&entry.bytes))
578 .collect::<Vec<_>>();
579 content_hashes.sort();
580 for content_hash in content_hashes {
581 hasher.update(b"\n");
582 hasher.update(content_hash.as_bytes());
583 }
584
585 Ok(blake3_digest_string(hasher.finalize()))
586}
587
588pub fn verify_workflow_bundle_signature(
589 bundle: &WorkflowBundle,
590 contents: &[HarnpackEntry],
591) -> Result<(), WorkflowBundleError> {
592 let signature = bundle.signature.as_ref().ok_or_else(|| {
593 WorkflowBundleError::new(
594 WorkflowBundleErrorKind::InvalidSignature,
595 "workflow bundle is unsigned",
596 )
597 })?;
598 if signature.algorithm != "ed25519" {
599 return Err(WorkflowBundleError::new(
600 WorkflowBundleErrorKind::InvalidSignature,
601 format!(
602 "unsupported workflow bundle signature algorithm {}",
603 signature.algorithm
604 ),
605 ));
606 }
607 let expected_hash = workflow_bundle_hash(bundle, contents)?;
608 if signature.manifest_hash_blake3 != expected_hash {
609 return Err(WorkflowBundleError::new(
610 WorkflowBundleErrorKind::InvalidSignature,
611 format!(
612 "workflow bundle signature hash mismatch; expected {expected_hash}, found {}",
613 signature.manifest_hash_blake3
614 ),
615 ));
616 }
617 let public_key_bytes = decode_hex_exact::<32>(
618 "workflow bundle signature public_key",
619 &signature.public_key,
620 )?;
621 let signature_bytes =
622 decode_hex_exact::<64>("workflow bundle signature", &signature.signature)?;
623 let verifying_key = VerifyingKey::from_bytes(&public_key_bytes).map_err(|error| {
624 WorkflowBundleError::new(
625 WorkflowBundleErrorKind::InvalidSignature,
626 format!("workflow bundle signature public_key is invalid Ed25519: {error}"),
627 )
628 })?;
629 let ed25519_signature = Signature::from_bytes(&signature_bytes);
630 verifying_key
631 .verify(expected_hash.as_bytes(), &ed25519_signature)
632 .map_err(|error| {
633 WorkflowBundleError::new(
634 WorkflowBundleErrorKind::InvalidSignature,
635 format!("workflow bundle signature failed Ed25519 verification: {error}"),
636 )
637 })
638}
639
640pub fn build_harnpack(
641 bundle: &WorkflowBundle,
642 contents: &[HarnpackEntry],
643) -> Result<Vec<u8>, WorkflowBundleError> {
644 let manifest_bytes = canonical_workflow_bundle_manifest_bytes(bundle)?;
645 let mut entries = contents
646 .iter()
647 .map(|entry| {
648 normalize_archive_path(&entry.path).map(|path| (path, entry.bytes.clone(), entry.mode))
649 })
650 .collect::<Result<Vec<_>, _>>()?;
651 entries.sort_by(|left, right| left.0.cmp(&right.0));
652
653 let mut seen = BTreeSet::new();
654 for (path, _, _) in &entries {
655 if path == HARNPACK_MANIFEST_PATH {
656 return Err(WorkflowBundleError::new(
657 WorkflowBundleErrorKind::DuplicateArchiveEntry,
658 format!("archive content cannot replace {HARNPACK_MANIFEST_PATH}"),
659 ));
660 }
661 if !seen.insert(path.clone()) {
662 return Err(WorkflowBundleError::new(
663 WorkflowBundleErrorKind::DuplicateArchiveEntry,
664 format!("duplicate archive entry: {path}"),
665 ));
666 }
667 }
668
669 let mut tar_bytes = Vec::new();
670 {
671 let mut builder = tar::Builder::new(&mut tar_bytes);
672 append_harnpack_entry(
673 &mut builder,
674 HARNPACK_MANIFEST_PATH,
675 &manifest_bytes,
676 DEFAULT_HARNPACK_FILE_MODE,
677 )?;
678 for (path, bytes, mode) in entries {
679 append_harnpack_entry(&mut builder, &path, &bytes, mode)?;
680 }
681 builder.finish()?;
682 }
683
684 zstd::stream::encode_all(Cursor::new(tar_bytes), 0).map_err(Into::into)
685}
686
687pub fn read_harnpack(bytes: &[u8]) -> Result<HarnpackArchive, WorkflowBundleError> {
688 let tar_bytes = zstd::stream::decode_all(Cursor::new(bytes))?;
689 let mut archive = tar::Archive::new(Cursor::new(tar_bytes));
690 let mut manifest = None;
691 let mut contents = Vec::new();
692 let mut seen = BTreeSet::new();
693
694 for entry in archive.entries()? {
695 let mut entry = entry?;
696 if entry.header().entry_type().is_dir() {
697 continue;
698 }
699 let path = normalize_archive_path(entry.path()?.as_ref())?;
700 if !seen.insert(path.clone()) {
701 return Err(WorkflowBundleError::new(
702 WorkflowBundleErrorKind::DuplicateArchiveEntry,
703 format!("duplicate archive entry: {path}"),
704 ));
705 }
706 let mode = entry.header().mode().unwrap_or(DEFAULT_HARNPACK_FILE_MODE);
707 let mut entry_bytes = Vec::new();
708 entry.read_to_end(&mut entry_bytes)?;
709
710 if path == HARNPACK_MANIFEST_PATH {
711 manifest = Some(parse_workflow_bundle_manifest(&entry_bytes)?);
712 } else {
713 contents.push(HarnpackEntry {
714 path: PathBuf::from(path),
715 bytes: entry_bytes,
716 mode,
717 });
718 }
719 }
720
721 contents.sort_by(|left, right| left.path.cmp(&right.path));
722 Ok(HarnpackArchive {
723 manifest: manifest.ok_or_else(|| {
724 WorkflowBundleError::new(
725 WorkflowBundleErrorKind::InvalidArchive,
726 format!("harnpack archive is missing {HARNPACK_MANIFEST_PATH}"),
727 )
728 })?,
729 contents,
730 })
731}
732
733pub fn current_provider_catalog_hash_blake3() -> Result<String, WorkflowBundleError> {
734 let bytes = serde_json::to_vec(&crate::provider_catalog::artifact())?;
735 Ok(blake3_hash_bytes(&bytes))
736}
737
738pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
739 let mut canonical = canonical_workflow_graph(graph);
740 canonical.audit_log.clear();
741 let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
742 let digest = Sha256::digest(bytes);
743 let hex = digest
744 .iter()
745 .map(|byte| format!("{byte:02x}"))
746 .collect::<String>();
747 format!("sha256:{hex}")
748}
749
750fn canonical_workflow_bundle_manifest(bundle: &WorkflowBundle) -> WorkflowBundle {
751 let mut canonical = bundle.clone();
752 canonical.workflow = canonical_workflow_graph(&bundle.workflow);
753 canonical.transitive_modules.sort_by(|left, right| {
754 (
755 path_sort_key(&left.path),
756 &left.source_hash_blake3,
757 &left.harnbc_hash_blake3,
758 )
759 .cmp(&(
760 path_sort_key(&right.path),
761 &right.source_hash_blake3,
762 &right.harnbc_hash_blake3,
763 ))
764 });
765 canonical.tool_manifest.sort_by(|left, right| {
766 (&left.name, &left.provider, &left.schema_hash_blake3).cmp(&(
767 &right.name,
768 &right.provider,
769 &right.schema_hash_blake3,
770 ))
771 });
772 canonical.sbom.packages.sort_by(|left, right| {
773 (&left.name, &left.version, &left.package_hash_blake3).cmp(&(
774 &right.name,
775 &right.version,
776 &right.package_hash_blake3,
777 ))
778 });
779 canonical.sbom.relationships.sort_by(|left, right| {
780 (&left.from, &left.to, &left.relationship_type).cmp(&(
781 &right.from,
782 &right.to,
783 &right.relationship_type,
784 ))
785 });
786 canonical
787}
788
789fn decode_hex_exact<const N: usize>(
790 label: &str,
791 value: &str,
792) -> Result<[u8; N], WorkflowBundleError> {
793 let bytes = hex::decode(value).map_err(|error| {
794 WorkflowBundleError::new(
795 WorkflowBundleErrorKind::InvalidSignature,
796 format!("{label} is not hex: {error}"),
797 )
798 })?;
799 bytes.try_into().map_err(|bytes: Vec<u8>| {
800 WorkflowBundleError::new(
801 WorkflowBundleErrorKind::InvalidSignature,
802 format!("{label} must decode to {N} bytes, got {}", bytes.len()),
803 )
804 })
805}
806
807fn append_harnpack_entry<W: std::io::Write>(
808 builder: &mut tar::Builder<W>,
809 path: &str,
810 bytes: &[u8],
811 mode: u32,
812) -> Result<(), WorkflowBundleError> {
813 let mut header = tar::Header::new_gnu();
814 header.set_path(path).map_err(|error| {
815 WorkflowBundleError::new(
816 WorkflowBundleErrorKind::UnsafeArchivePath,
817 format!("invalid archive path {path}: {error}"),
818 )
819 })?;
820 header.set_entry_type(tar::EntryType::Regular);
821 header.set_size(bytes.len() as u64);
822 header.set_mode(mode);
823 header.set_mtime(0);
824 header.set_uid(0);
825 header.set_gid(0);
826 header.set_cksum();
827 builder.append(&header, bytes)?;
828 Ok(())
829}
830
831fn normalize_archive_path(path: &Path) -> Result<String, WorkflowBundleError> {
832 let mut parts = Vec::new();
833 for component in path.components() {
834 match component {
835 Component::Normal(part) => {
836 let Some(part) = part.to_str() else {
837 return Err(WorkflowBundleError::new(
838 WorkflowBundleErrorKind::UnsafeArchivePath,
839 format!("archive path is not valid UTF-8: {}", path.display()),
840 ));
841 };
842 if part.is_empty() {
843 return Err(WorkflowBundleError::new(
844 WorkflowBundleErrorKind::UnsafeArchivePath,
845 "archive path contains an empty component",
846 ));
847 }
848 parts.push(part.to_string());
849 }
850 Component::CurDir => {}
851 Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
852 return Err(WorkflowBundleError::new(
853 WorkflowBundleErrorKind::UnsafeArchivePath,
854 format!(
855 "archive path must be relative and contained: {}",
856 path.display()
857 ),
858 ));
859 }
860 }
861 }
862 if parts.is_empty() {
863 return Err(WorkflowBundleError::new(
864 WorkflowBundleErrorKind::UnsafeArchivePath,
865 "archive path is empty",
866 ));
867 }
868 Ok(parts.join("/"))
869}
870
871fn path_sort_key(path: &Path) -> String {
872 path.components()
873 .filter_map(|component| match component {
874 Component::Normal(part) => part.to_str().map(ToOwned::to_owned),
875 _ => None,
876 })
877 .collect::<Vec<_>>()
878 .join("/")
879}
880
881fn blake3_hash_bytes(bytes: &[u8]) -> String {
882 blake3_digest_string(blake3::hash(bytes))
883}
884
885fn blake3_digest_string(hash: blake3::Hash) -> String {
886 format!("blake3:{hash}")
887}
888
889pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
890 let canonical = canonical_workflow_graph(&bundle.workflow);
891 let mut report = WorkflowBundleValidationReport {
892 valid: true,
893 bundle_id: bundle.id.clone(),
894 workflow_id: canonical.id.clone(),
895 graph_digest: workflow_graph_digest(&canonical),
896 errors: Vec::new(),
897 warnings: Vec::new(),
898 };
899
900 validate_manifest_contract(bundle, &mut report);
901 validate_bundle_identity(bundle, &canonical, &mut report);
902 validate_triggers(bundle, &canonical, &mut report);
903 validate_prompt_capsules(bundle, &canonical, &mut report);
904 validate_policy(bundle, &mut report);
905 validate_connectors(bundle, &mut report);
906 validate_environment(bundle, &mut report);
907
908 let graph_report = validate_workflow(&canonical, None);
909 for error in graph_report.errors {
910 let node_id = workflow_diagnostic_node_id(&error, &canonical);
911 push_error(&mut report, "workflow", error, node_id);
912 }
913 for warning in graph_report.warnings {
914 let node_id = workflow_diagnostic_node_id(&warning, &canonical);
915 push_warning(&mut report, "workflow", warning, node_id);
916 }
917
918 if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
919 if expected != report.graph_digest {
920 let actual = report.graph_digest.clone();
921 push_error(
922 &mut report,
923 "receipts.graph_digest",
924 format!("graph digest mismatch: expected {expected}, computed {actual}"),
925 None,
926 );
927 }
928 }
929 if let Some(expected_version) = bundle.receipts.workflow_version {
930 if expected_version != canonical.version {
931 push_error(
932 &mut report,
933 "receipts.workflow_version",
934 format!(
935 "workflow version mismatch: expected {expected_version}, computed {}",
936 canonical.version
937 ),
938 None,
939 );
940 }
941 }
942
943 report.valid = report.errors.is_empty();
944 report
945}
946
947pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
948 let canonical = canonical_workflow_graph(&bundle.workflow);
949 let validation = validate_workflow_bundle(bundle);
950 let graph = export_workflow_bundle_graph(bundle, &validation);
951 let mermaid = graph.mermaid.clone();
952 let triggers_by_node = triggers_by_node(bundle);
953 let capsules_by_node = capsules_by_node(bundle);
954 let mut nodes = Vec::new();
955
956 for (node_id, node) in &canonical.nodes {
957 let mut outgoing = canonical
958 .edges
959 .iter()
960 .filter(|edge| edge.from == *node_id)
961 .map(|edge| edge.to.clone())
962 .collect::<Vec<_>>();
963 outgoing.sort();
964 outgoing.dedup();
965 nodes.push(WorkflowBundlePreviewNode {
966 id: node_id.clone(),
967 kind: node.kind.clone(),
968 label: node.task_label.clone(),
969 prompt_capsule: capsules_by_node.get(node_id).cloned(),
970 trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
971 outgoing,
972 });
973 }
974
975 WorkflowBundlePreview {
976 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
977 bundle_id: bundle.id.clone(),
978 bundle_version: bundle.version.clone(),
979 workflow_id: canonical.id.clone(),
980 workflow_version: canonical.version,
981 graph_digest: validation.graph_digest.clone(),
982 validation,
983 graph,
984 mermaid,
985 triggers: bundle.triggers.clone(),
986 connectors: bundle.connectors.clone(),
987 environment: bundle.environment.clone(),
988 nodes,
989 edges: sorted_edges(&canonical),
990 }
991}
992
993pub fn export_workflow_bundle_graph(
994 bundle: &WorkflowBundle,
995 validation: &WorkflowBundleValidationReport,
996) -> WorkflowBundleGraphExport {
997 let canonical = canonical_workflow_graph(&bundle.workflow);
998 let mut nodes = Vec::new();
999 let mut edges = Vec::new();
1000 let mut editable_fields = Vec::new();
1001 let capsules_by_node = capsules_by_node(bundle);
1002 let catchup_enabled = bundle.policy.catchup.mode != "none";
1003 let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
1004
1005 for (index, connector) in bundle.connectors.iter().enumerate() {
1006 let node_fields = connector_editable_fields(index, connector);
1007 editable_fields.extend(node_fields.clone());
1008 nodes.push(WorkflowBundleGraphNode {
1009 id: connector_graph_id(&connector.id),
1010 node_type: "connector_call".to_string(),
1011 label: connector_label(connector),
1012 workflow_node_id: None,
1013 trigger_id: None,
1014 connector_id: Some(connector.id.clone()),
1015 editable_fields: node_fields,
1016 metadata: BTreeMap::from([
1017 (
1018 "provider_id".to_string(),
1019 serde_json::json!(connector.provider_id),
1020 ),
1021 ("scopes".to_string(), serde_json::json!(connector.scopes)),
1022 ]),
1023 });
1024 }
1025
1026 let catchup_fields = catchup_editable_fields();
1027 let retry_fields = retry_editable_fields();
1028 if catchup_enabled {
1029 let node_fields = catchup_fields.clone();
1030 editable_fields.extend(node_fields.clone());
1031 nodes.push(WorkflowBundleGraphNode {
1032 id: catchup_graph_id(),
1033 node_type: "catchup".to_string(),
1034 label: "Catch up".to_string(),
1035 workflow_node_id: None,
1036 trigger_id: None,
1037 connector_id: None,
1038 editable_fields: node_fields,
1039 metadata: BTreeMap::from([(
1040 "mode".to_string(),
1041 serde_json::json!(bundle.policy.catchup.mode),
1042 )]),
1043 });
1044 } else {
1045 editable_fields.extend(catchup_fields);
1046 }
1047 if retry_can_dlq {
1048 let node_fields = retry_fields.clone();
1049 editable_fields.extend(node_fields.clone());
1050 nodes.push(WorkflowBundleGraphNode {
1051 id: dlq_graph_id(),
1052 node_type: "dlq".to_string(),
1053 label: "Dead letter queue".to_string(),
1054 workflow_node_id: None,
1055 trigger_id: None,
1056 connector_id: None,
1057 editable_fields: node_fields,
1058 metadata: BTreeMap::from([(
1059 "max_attempts".to_string(),
1060 serde_json::json!(bundle.policy.retry.max_attempts),
1061 )]),
1062 });
1063 } else {
1064 editable_fields.extend(retry_fields);
1065 }
1066
1067 for (index, trigger) in bundle.triggers.iter().enumerate() {
1068 let node_fields = trigger_editable_fields(index, trigger);
1069 editable_fields.extend(node_fields.clone());
1070 nodes.push(WorkflowBundleGraphNode {
1071 id: trigger_graph_id(&trigger.id),
1072 node_type: "trigger".to_string(),
1073 label: trigger_label(trigger),
1074 workflow_node_id: trigger.node_id.clone(),
1075 trigger_id: Some(trigger.id.clone()),
1076 connector_id: None,
1077 editable_fields: node_fields,
1078 metadata: BTreeMap::from([
1079 ("kind".to_string(), serde_json::json!(trigger.kind)),
1080 ("provider".to_string(), serde_json::json!(trigger.provider)),
1081 ("events".to_string(), serde_json::json!(trigger.events)),
1082 ]),
1083 });
1084 if let Some(provider) = trigger.provider.as_deref() {
1085 if let Some(connector) = bundle
1086 .connectors
1087 .iter()
1088 .find(|connector| connector.provider_id == provider || connector.id == provider)
1089 {
1090 edges.push(WorkflowBundleGraphEdge {
1091 from: connector_graph_id(&connector.id),
1092 to: trigger_graph_id(&trigger.id),
1093 label: Some("binds".to_string()),
1094 branch: None,
1095 });
1096 }
1097 }
1098 let target = trigger
1099 .node_id
1100 .clone()
1101 .unwrap_or_else(|| canonical.entry.clone());
1102 if catchup_enabled {
1103 edges.push(WorkflowBundleGraphEdge {
1104 from: trigger_graph_id(&trigger.id),
1105 to: catchup_graph_id(),
1106 label: Some(bundle.policy.catchup.mode.clone()),
1107 branch: Some("catchup".to_string()),
1108 });
1109 edges.push(WorkflowBundleGraphEdge {
1110 from: catchup_graph_id(),
1111 to: workflow_graph_id(&target),
1112 label: Some("dispatch".to_string()),
1113 branch: None,
1114 });
1115 } else {
1116 edges.push(WorkflowBundleGraphEdge {
1117 from: trigger_graph_id(&trigger.id),
1118 to: workflow_graph_id(&target),
1119 label: Some("dispatch".to_string()),
1120 branch: None,
1121 });
1122 }
1123 }
1124
1125 for (node_id, node) in &canonical.nodes {
1126 let capsule_id = capsules_by_node.get(node_id);
1127 let node_fields = workflow_node_editable_fields(node_id, capsule_id);
1128 editable_fields.extend(node_fields.clone());
1129 nodes.push(WorkflowBundleGraphNode {
1130 id: workflow_graph_id(node_id),
1131 node_type: workflow_node_type(&node.kind),
1132 label: workflow_node_label(node_id, node),
1133 workflow_node_id: Some(node_id.clone()),
1134 trigger_id: None,
1135 connector_id: None,
1136 editable_fields: node_fields,
1137 metadata: BTreeMap::from([
1138 ("kind".to_string(), serde_json::json!(node.kind)),
1139 ("task_label".to_string(), serde_json::json!(node.task_label)),
1140 (
1141 "prompt_capsule".to_string(),
1142 serde_json::json!(capsule_id.cloned()),
1143 ),
1144 ]),
1145 });
1146 }
1147
1148 for edge in sorted_edges(&canonical) {
1149 edges.push(WorkflowBundleGraphEdge {
1150 from: workflow_graph_id(&edge.from),
1151 to: workflow_graph_id(&edge.to),
1152 label: edge.label.clone(),
1153 branch: edge.branch.clone(),
1154 });
1155 }
1156
1157 let outgoing: BTreeSet<&str> = canonical
1158 .edges
1159 .iter()
1160 .map(|edge| edge.from.as_str())
1161 .collect();
1162 for node_id in canonical.nodes.keys() {
1163 if !outgoing.contains(node_id.as_str()) {
1164 edges.push(WorkflowBundleGraphEdge {
1165 from: workflow_graph_id(node_id),
1166 to: terminal_completed_graph_id(),
1167 label: Some("completed".to_string()),
1168 branch: Some("completed".to_string()),
1169 });
1170 }
1171 if retry_can_dlq {
1172 edges.push(WorkflowBundleGraphEdge {
1173 from: workflow_graph_id(node_id),
1174 to: dlq_graph_id(),
1175 label: Some("retry exhausted".to_string()),
1176 branch: Some("failed".to_string()),
1177 });
1178 }
1179 }
1180
1181 nodes.push(WorkflowBundleGraphNode {
1182 id: terminal_completed_graph_id(),
1183 node_type: "terminal".to_string(),
1184 label: "Completed".to_string(),
1185 workflow_node_id: None,
1186 trigger_id: None,
1187 connector_id: None,
1188 editable_fields: Vec::new(),
1189 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
1190 });
1191 nodes.push(WorkflowBundleGraphNode {
1192 id: terminal_failed_graph_id(),
1193 node_type: "terminal".to_string(),
1194 label: "Failed".to_string(),
1195 workflow_node_id: None,
1196 trigger_id: None,
1197 connector_id: None,
1198 editable_fields: Vec::new(),
1199 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
1200 });
1201 if retry_can_dlq {
1202 edges.push(WorkflowBundleGraphEdge {
1203 from: dlq_graph_id(),
1204 to: terminal_failed_graph_id(),
1205 label: Some("failed".to_string()),
1206 branch: Some("failed".to_string()),
1207 });
1208 }
1209
1210 nodes.sort_by(|left, right| left.id.cmp(&right.id));
1211 edges.sort_by(|left, right| {
1212 (&left.from, &left.to, &left.branch, &left.label).cmp(&(
1213 &right.from,
1214 &right.to,
1215 &right.branch,
1216 &right.label,
1217 ))
1218 });
1219 editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
1220
1221 let diagnostics = validation
1222 .errors
1223 .iter()
1224 .chain(validation.warnings.iter())
1225 .map(|diagnostic| WorkflowBundleGraphDiagnostic {
1226 severity: diagnostic.severity.clone(),
1227 path: diagnostic.path.clone(),
1228 message: diagnostic.message.clone(),
1229 node_id: diagnostic.node_id.clone(),
1230 graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
1231 })
1232 .collect::<Vec<_>>();
1233 let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
1234
1235 WorkflowBundleGraphExport {
1236 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1237 graph_id: canonical.id,
1238 graph_digest: validation.graph_digest.clone(),
1239 nodes,
1240 edges,
1241 diagnostics,
1242 editable_fields,
1243 mermaid,
1244 }
1245}
1246
1247pub fn run_workflow_bundle(
1248 bundle: &WorkflowBundle,
1249 request: WorkflowBundleRunRequest,
1250) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
1251 let validation = validate_workflow_bundle(bundle);
1252 if !validation.valid {
1253 return Err(validation);
1254 }
1255
1256 let canonical = canonical_workflow_graph(&bundle.workflow);
1257 let trigger_id = match request.trigger_id {
1258 Some(trigger_id)
1259 if !bundle
1260 .triggers
1261 .iter()
1262 .any(|trigger| trigger.id == trigger_id) =>
1263 {
1264 let mut report = validation;
1265 push_error(
1266 &mut report,
1267 "trigger_id",
1268 format!("unknown trigger id: {trigger_id}"),
1269 None,
1270 );
1271 report.valid = false;
1272 return Err(report);
1273 }
1274 Some(trigger_id) => Some(trigger_id),
1275 None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
1276 };
1277 let mut event_ids = bundle.receipts.event_ids.clone();
1278 if let Some(event_id) = request.event_id {
1279 if !event_ids.contains(&event_id) {
1280 event_ids.push(event_id);
1281 }
1282 }
1283 let run_id = bundle
1284 .receipts
1285 .run_id
1286 .clone()
1287 .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
1288 let capsules_by_node = capsules_by_node(bundle);
1289 let executed_nodes = execution_order(&canonical)
1290 .into_iter()
1291 .map(|node_id| {
1292 let node = canonical
1293 .nodes
1294 .get(&node_id)
1295 .expect("execution order only contains known nodes");
1296 WorkflowBundleRunNodeReceipt {
1297 node_id: node_id.clone(),
1298 kind: node.kind.clone(),
1299 prompt_capsule: capsules_by_node.get(&node_id).cloned(),
1300 status: "completed".to_string(),
1301 }
1302 })
1303 .collect();
1304
1305 Ok(WorkflowBundleRunReceipt {
1306 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
1307 receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
1308 bundle_id: bundle.id.clone(),
1309 bundle_version: bundle.version.clone(),
1310 workflow_id: canonical.id,
1311 workflow_version: canonical.version,
1312 graph_digest: validation.graph_digest,
1313 run_id,
1314 trigger_id,
1315 event_ids,
1316 status: "completed".to_string(),
1317 executed_nodes,
1318 policy: bundle.policy.clone(),
1319 connectors: bundle.connectors.clone(),
1320 environment: bundle.environment.clone(),
1321 })
1322}
1323
1324fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
1325 let mut canonical = graph.clone();
1326 if canonical.type_name.is_empty() {
1327 canonical.type_name = "workflow_graph".to_string();
1328 }
1329 if canonical.version == 0 {
1330 canonical.version = 1;
1331 }
1332 if canonical.entry.is_empty() {
1333 canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
1334 }
1335 for (node_id, node) in &mut canonical.nodes {
1336 if node.id.is_none() {
1337 node.id = Some(node_id.clone());
1338 }
1339 if node.kind.is_empty() {
1340 node.kind = "stage".to_string();
1341 }
1342 if node.retry_policy.max_attempts == 0 {
1343 node.retry_policy.max_attempts = 1;
1344 }
1345 }
1346 canonical.edges = sorted_edges(&canonical);
1347 canonical
1348}
1349
1350fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
1351 let mut edges = graph.edges.clone();
1352 edges.sort_by(|left, right| {
1353 (
1354 &left.from,
1355 &left.to,
1356 left.branch.as_deref(),
1357 left.label.as_deref(),
1358 )
1359 .cmp(&(
1360 &right.from,
1361 &right.to,
1362 right.branch.as_deref(),
1363 right.label.as_deref(),
1364 ))
1365 });
1366 edges
1367}
1368
1369fn validate_bundle_identity(
1370 bundle: &WorkflowBundle,
1371 graph: &WorkflowGraph,
1372 report: &mut WorkflowBundleValidationReport,
1373) {
1374 if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
1375 push_error(
1376 report,
1377 "schema_version",
1378 format!(
1379 "unsupported schema_version {}; expected {}",
1380 bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
1381 ),
1382 None,
1383 );
1384 }
1385 if bundle.id.trim().is_empty() {
1386 push_error(report, "id", "bundle id is required", None);
1387 }
1388 if bundle.version.trim().is_empty() {
1389 push_error(report, "version", "bundle version is required", None);
1390 }
1391 if graph.id.trim().is_empty() {
1392 push_error(
1393 report,
1394 "workflow.id",
1395 "workflow id is required for portable bundles",
1396 None,
1397 );
1398 }
1399 if graph.nodes.is_empty() {
1400 push_error(
1401 report,
1402 "workflow.nodes",
1403 "workflow must contain nodes",
1404 None,
1405 );
1406 }
1407 for (node_id, node) in &graph.nodes {
1408 if node_id.trim().is_empty() {
1409 push_error(report, "workflow.nodes", "node id is required", None);
1410 }
1411 if node.id.as_deref().is_some_and(|id| id != node_id) {
1412 push_error(
1413 report,
1414 format!("workflow.nodes.{node_id}.id"),
1415 "node id field must match its map key",
1416 Some(node_id.clone()),
1417 );
1418 }
1419 }
1420}
1421
1422fn validate_manifest_contract(
1423 bundle: &WorkflowBundle,
1424 report: &mut WorkflowBundleValidationReport,
1425) {
1426 validate_relative_path(
1427 report,
1428 "entrypoint",
1429 &bundle.entrypoint,
1430 "entrypoint is required",
1431 );
1432 if bundle.transitive_modules.is_empty() {
1433 push_error(
1434 report,
1435 "transitive_modules",
1436 "at least one transitive module entry is required",
1437 None,
1438 );
1439 }
1440 let mut module_paths = BTreeSet::new();
1441 for (index, module) in bundle.transitive_modules.iter().enumerate() {
1442 let path = format!("transitive_modules[{index}]");
1443 validate_relative_path(
1444 report,
1445 format!("{path}.path"),
1446 &module.path,
1447 "module path is required",
1448 );
1449 if !module.path.as_os_str().is_empty() && !module_paths.insert(path_sort_key(&module.path))
1450 {
1451 push_error(
1452 report,
1453 format!("{path}.path"),
1454 format!(
1455 "duplicate transitive module path: {}",
1456 module.path.display()
1457 ),
1458 None,
1459 );
1460 }
1461 validate_blake3_hash(
1462 report,
1463 format!("{path}.source_hash_blake3"),
1464 &module.source_hash_blake3,
1465 true,
1466 );
1467 validate_blake3_hash(
1468 report,
1469 format!("{path}.harnbc_hash_blake3"),
1470 &module.harnbc_hash_blake3,
1471 true,
1472 );
1473 }
1474 if bundle.stdlib_version.trim().is_empty() {
1475 push_error(report, "stdlib_version", "stdlib_version is required", None);
1476 }
1477 if bundle.harn_version.trim().is_empty() {
1478 push_error(report, "harn_version", "harn_version is required", None);
1479 }
1480 validate_blake3_hash(
1481 report,
1482 "provider_catalog_hash",
1483 &bundle.provider_catalog_hash,
1484 true,
1485 );
1486
1487 let mut tool_names = BTreeSet::new();
1488 for (index, tool) in bundle.tool_manifest.iter().enumerate() {
1489 let path = format!("tool_manifest[{index}]");
1490 if tool.name.trim().is_empty() {
1491 push_error(
1492 report,
1493 format!("{path}.name"),
1494 "tool manifest entry name is required",
1495 None,
1496 );
1497 } else if !tool_names.insert(tool.name.clone()) {
1498 push_error(
1499 report,
1500 format!("{path}.name"),
1501 format!("duplicate tool manifest entry: {}", tool.name),
1502 None,
1503 );
1504 }
1505 if let Some(hash) = tool.schema_hash_blake3.as_deref() {
1506 validate_blake3_hash(report, format!("{path}.schema_hash_blake3"), hash, true);
1507 }
1508 }
1509
1510 if bundle.sbom.format.trim().is_empty() {
1511 push_error(report, "sbom.format", "SBOM format is required", None);
1512 }
1513 if bundle.sbom.version.trim().is_empty() {
1514 push_error(report, "sbom.version", "SBOM version is required", None);
1515 }
1516 let mut sbom_packages = BTreeSet::new();
1517 for (index, package) in bundle.sbom.packages.iter().enumerate() {
1518 let path = format!("sbom.packages[{index}]");
1519 if package.name.trim().is_empty() {
1520 push_error(
1521 report,
1522 format!("{path}.name"),
1523 "SBOM package name is required",
1524 None,
1525 );
1526 } else if !sbom_packages.insert(package.name.clone()) {
1527 push_error(
1528 report,
1529 format!("{path}.name"),
1530 format!("duplicate SBOM package: {}", package.name),
1531 None,
1532 );
1533 }
1534 if let Some(hash) = package.package_hash_blake3.as_deref() {
1535 validate_blake3_hash(report, format!("{path}.package_hash_blake3"), hash, true);
1536 }
1537 }
1538 for (index, relationship) in bundle.sbom.relationships.iter().enumerate() {
1539 let path = format!("sbom.relationships[{index}]");
1540 if relationship.from.trim().is_empty() {
1541 push_error(
1542 report,
1543 format!("{path}.from"),
1544 "SBOM relationship source is required",
1545 None,
1546 );
1547 }
1548 if relationship.to.trim().is_empty() {
1549 push_error(
1550 report,
1551 format!("{path}.to"),
1552 "SBOM relationship target is required",
1553 None,
1554 );
1555 }
1556 if relationship.relationship_type.trim().is_empty() {
1557 push_error(
1558 report,
1559 format!("{path}.relationship_type"),
1560 "SBOM relationship type is required",
1561 None,
1562 );
1563 }
1564 }
1565
1566 if let Some(parent_id) = bundle.parent_trust_record_id.as_deref() {
1567 if parent_id.trim().is_empty() {
1568 push_error(
1569 report,
1570 "parent_trust_record_id",
1571 "parent_trust_record_id cannot be empty when present",
1572 None,
1573 );
1574 }
1575 }
1576 if let Some(signature) = &bundle.signature {
1577 if signature.algorithm.trim().is_empty() {
1578 push_error(
1579 report,
1580 "signature.algorithm",
1581 "signature algorithm is required",
1582 None,
1583 );
1584 } else if signature.algorithm != "ed25519" {
1585 push_error(
1586 report,
1587 "signature.algorithm",
1588 "signature algorithm must be ed25519",
1589 None,
1590 );
1591 }
1592 if signature.public_key.trim().is_empty() {
1593 push_error(
1594 report,
1595 "signature.public_key",
1596 "signature public_key is required",
1597 None,
1598 );
1599 }
1600 if signature.signature.trim().is_empty() {
1601 push_error(
1602 report,
1603 "signature.signature",
1604 "signature value is required",
1605 None,
1606 );
1607 }
1608 validate_blake3_hash(
1609 report,
1610 "signature.manifest_hash_blake3",
1611 &signature.manifest_hash_blake3,
1612 true,
1613 );
1614 }
1615}
1616
1617fn validate_relative_path(
1618 report: &mut WorkflowBundleValidationReport,
1619 path: impl Into<String>,
1620 value: &Path,
1621 empty_message: &str,
1622) {
1623 let path = path.into();
1624 if value.as_os_str().is_empty() {
1625 push_error(report, path, empty_message, None);
1626 return;
1627 }
1628 if normalize_archive_path(value).is_err() {
1629 push_error(
1630 report,
1631 path,
1632 format!("path must be relative and contained: {}", value.display()),
1633 None,
1634 );
1635 }
1636}
1637
1638fn validate_blake3_hash(
1639 report: &mut WorkflowBundleValidationReport,
1640 path: impl Into<String>,
1641 value: &str,
1642 required: bool,
1643) {
1644 let path = path.into();
1645 if value.trim().is_empty() {
1646 if required {
1647 push_error(report, path, "BLAKE3 hash is required", None);
1648 }
1649 return;
1650 }
1651 let Some(hex) = value.strip_prefix("blake3:") else {
1652 push_error(report, path, "BLAKE3 hash must use blake3:<hex>", None);
1653 return;
1654 };
1655 if hex.len() != 64
1656 || !hex
1657 .bytes()
1658 .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
1659 {
1660 push_error(
1661 report,
1662 path,
1663 "BLAKE3 hash must contain 64 lowercase hex digits",
1664 None,
1665 );
1666 }
1667}
1668
1669fn validate_triggers(
1670 bundle: &WorkflowBundle,
1671 graph: &WorkflowGraph,
1672 report: &mut WorkflowBundleValidationReport,
1673) {
1674 if bundle.triggers.is_empty() {
1675 push_warning(report, "triggers", "bundle declares no triggers", None);
1676 }
1677 let mut ids = BTreeSet::new();
1678 for (index, trigger) in bundle.triggers.iter().enumerate() {
1679 let path = format!("triggers[{index}]");
1680 if trigger.id.trim().is_empty() {
1681 push_error(report, format!("{path}.id"), "trigger id is required", None);
1682 } else if !ids.insert(trigger.id.clone()) {
1683 push_error(
1684 report,
1685 format!("{path}.id"),
1686 format!("duplicate trigger id: {}", trigger.id),
1687 None,
1688 );
1689 }
1690 match trigger.kind.as_str() {
1691 "github" => {
1692 if trigger.provider.as_deref() != Some("github") {
1693 push_error(
1694 report,
1695 format!("{path}.provider"),
1696 "github triggers require provider=\"github\"",
1697 None,
1698 );
1699 }
1700 if trigger.events.is_empty() {
1701 push_error(
1702 report,
1703 format!("{path}.events"),
1704 "github triggers require at least one event",
1705 None,
1706 );
1707 }
1708 }
1709 "cron" if trigger.schedule.is_none() => push_error(
1710 report,
1711 format!("{path}.schedule"),
1712 "cron triggers require schedule",
1713 None,
1714 ),
1715 "delay" if trigger.delay.is_none() => push_error(
1716 report,
1717 format!("{path}.delay"),
1718 "delay triggers require delay",
1719 None,
1720 ),
1721 "webhook" if trigger.webhook_path.is_none() => push_error(
1722 report,
1723 format!("{path}.webhook_path"),
1724 "webhook triggers require webhook_path",
1725 None,
1726 ),
1727 "mcp" if trigger.mcp_tool.is_none() => push_error(
1728 report,
1729 format!("{path}.mcp_tool"),
1730 "mcp triggers require mcp_tool",
1731 None,
1732 ),
1733 "manual" => {}
1734 "" => push_error(
1735 report,
1736 format!("{path}.kind"),
1737 "trigger kind is required",
1738 None,
1739 ),
1740 other
1741 if !matches!(
1742 other,
1743 "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
1744 ) =>
1745 {
1746 push_error(
1747 report,
1748 format!("{path}.kind"),
1749 format!("unsupported trigger kind: {other}"),
1750 None,
1751 );
1752 }
1753 _ => {}
1754 }
1755 if let Some(node_id) = trigger.node_id.as_deref() {
1756 if !graph.nodes.contains_key(node_id) {
1757 push_error(
1758 report,
1759 format!("{path}.node_id"),
1760 format!("trigger references unknown node: {node_id}"),
1761 Some(node_id.to_string()),
1762 );
1763 }
1764 }
1765 }
1766}
1767
1768fn validate_prompt_capsules(
1769 bundle: &WorkflowBundle,
1770 graph: &WorkflowGraph,
1771 report: &mut WorkflowBundleValidationReport,
1772) {
1773 let trigger_ids: BTreeSet<&str> = bundle
1774 .triggers
1775 .iter()
1776 .map(|trigger| trigger.id.as_str())
1777 .collect();
1778 let mut node_refs = BTreeSet::new();
1779 for (key, capsule) in &bundle.prompt_capsules {
1780 let path = format!("prompt_capsules.{key}");
1781 if capsule.id.trim().is_empty() {
1782 push_error(
1783 report,
1784 format!("{path}.id"),
1785 "prompt capsule id is required",
1786 None,
1787 );
1788 } else if capsule.id != *key {
1789 push_error(
1790 report,
1791 format!("{path}.id"),
1792 "prompt capsule id must match its map key",
1793 None,
1794 );
1795 }
1796 if capsule.prompt.trim().is_empty() {
1797 push_error(
1798 report,
1799 format!("{path}.prompt"),
1800 "prompt capsule prompt is required",
1801 Some(capsule.node_id.clone()),
1802 );
1803 }
1804 if !graph.nodes.contains_key(&capsule.node_id) {
1805 push_error(
1806 report,
1807 format!("{path}.node_id"),
1808 format!(
1809 "prompt capsule references unknown node: {}",
1810 capsule.node_id
1811 ),
1812 Some(capsule.node_id.clone()),
1813 );
1814 }
1815 if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1816 push_error(
1817 report,
1818 format!("{path}.node_id"),
1819 format!("multiple prompt capsules target node {}", capsule.node_id),
1820 Some(capsule.node_id.clone()),
1821 );
1822 }
1823 if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1824 if !trigger_ids.contains(trigger_id) {
1825 push_error(
1826 report,
1827 format!("{path}.trigger_id"),
1828 format!("prompt capsule references unknown trigger: {trigger_id}"),
1829 Some(capsule.node_id.clone()),
1830 );
1831 }
1832 }
1833 }
1834}
1835
1836fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1837 if !matches!(
1838 bundle.policy.autonomy_tier.as_str(),
1839 "shadow" | "suggest" | "act_with_approval" | "act_auto"
1840 ) {
1841 push_error(
1842 report,
1843 "policy.autonomy_tier",
1844 "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1845 None,
1846 );
1847 }
1848 if bundle.policy.retry.max_attempts == 0 {
1849 push_error(
1850 report,
1851 "policy.retry.max_attempts",
1852 "retry.max_attempts must be at least 1",
1853 None,
1854 );
1855 }
1856 if !matches!(
1857 bundle.policy.catchup.mode.as_str(),
1858 "none" | "latest" | "all"
1859 ) {
1860 push_error(
1861 report,
1862 "policy.catchup.mode",
1863 "catchup.mode must be none, latest, or all",
1864 None,
1865 );
1866 }
1867}
1868
1869fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1870 let mut ids = BTreeSet::new();
1871 let provider_ids: BTreeSet<&str> = bundle
1872 .connectors
1873 .iter()
1874 .map(|connector| connector.provider_id.as_str())
1875 .collect();
1876 for (index, connector) in bundle.connectors.iter().enumerate() {
1877 let path = format!("connectors[{index}]");
1878 if connector.id.trim().is_empty() {
1879 push_error(
1880 report,
1881 format!("{path}.id"),
1882 "connector id is required",
1883 None,
1884 );
1885 } else if !ids.insert(connector.id.clone()) {
1886 push_error(
1887 report,
1888 format!("{path}.id"),
1889 format!("duplicate connector id: {}", connector.id),
1890 None,
1891 );
1892 }
1893 if connector.provider_id.trim().is_empty() {
1894 push_error(
1895 report,
1896 format!("{path}.provider_id"),
1897 "connector provider_id is required",
1898 None,
1899 );
1900 }
1901 }
1902 for trigger in &bundle.triggers {
1903 if let Some(provider) = trigger.provider.as_deref() {
1904 if !provider_ids.contains(provider) {
1905 push_warning(
1906 report,
1907 "connectors",
1908 format!(
1909 "trigger {} references provider {provider} with no connector requirement",
1910 trigger.id
1911 ),
1912 trigger.node_id.clone(),
1913 );
1914 }
1915 }
1916 }
1917}
1918
1919fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1920 if !matches!(
1921 bundle.environment.worktree_policy.as_str(),
1922 "reuse_current" | "new_worktree" | "host_managed"
1923 ) {
1924 push_error(
1925 report,
1926 "environment.worktree_policy",
1927 "worktree_policy must be reuse_current, new_worktree, or host_managed",
1928 None,
1929 );
1930 }
1931}
1932
1933fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1934 for prefix in [
1935 "node is unreachable: ",
1936 "edge.from references unknown node: ",
1937 "edge.to references unknown node: ",
1938 "entry node does not exist: ",
1939 ] {
1940 if let Some(node_id) = message.strip_prefix(prefix) {
1941 return Some(node_id.to_string());
1942 }
1943 }
1944 if let Some(rest) = message.strip_prefix("node ") {
1945 if let Some((node_id, _)) = rest.split_once(':') {
1946 return Some(node_id.to_string());
1947 }
1948 }
1949 graph
1950 .nodes
1951 .keys()
1952 .find(|node_id| message.contains(&format!("node {node_id}:")))
1953 .cloned()
1954}
1955
1956fn workflow_graph_id(node_id: &str) -> String {
1957 format!("node/{node_id}")
1958}
1959
1960fn trigger_graph_id(trigger_id: &str) -> String {
1961 format!("trigger/{trigger_id}")
1962}
1963
1964fn connector_graph_id(connector_id: &str) -> String {
1965 format!("connector/{connector_id}")
1966}
1967
1968fn catchup_graph_id() -> String {
1969 "policy/catchup".to_string()
1970}
1971
1972fn dlq_graph_id() -> String {
1973 "policy/dlq".to_string()
1974}
1975
1976fn terminal_completed_graph_id() -> String {
1977 "terminal/completed".to_string()
1978}
1979
1980fn terminal_failed_graph_id() -> String {
1981 "terminal/failed".to_string()
1982}
1983
1984fn workflow_node_type(kind: &str) -> String {
1985 match kind {
1986 "action" => "action",
1987 "stage" | "agent" => "agent",
1988 "subagent" | "worker" => "subagent",
1989 "wait" | "waitpoint" | "delay" => "wait",
1990 "approval" | "hitl" => "approval",
1991 "connector" | "connector_call" => "connector_call",
1992 "notification" | "notify" => "notification",
1993 "terminal" | "success" | "failure" => "terminal",
1994 other if other.trim().is_empty() => "agent",
1995 other => other,
1996 }
1997 .to_string()
1998}
1999
2000fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
2001 node.task_label
2002 .clone()
2003 .or_else(|| node.prompt.clone())
2004 .map(|label| label.trim().to_string())
2005 .filter(|label| !label.is_empty())
2006 .unwrap_or_else(|| node_id.to_string())
2007}
2008
2009fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
2010 if !trigger.events.is_empty() {
2011 format!("{}: {}", trigger.kind, trigger.events.join(", "))
2012 } else if let Some(schedule) = trigger.schedule.as_deref() {
2013 format!("cron: {schedule}")
2014 } else if let Some(delay) = trigger.delay.as_deref() {
2015 format!("delay: {delay}")
2016 } else {
2017 trigger.id.clone()
2018 }
2019}
2020
2021fn connector_label(connector: &ConnectorRequirement) -> String {
2022 if connector.provider_id.is_empty() || connector.provider_id == connector.id {
2023 connector.id.clone()
2024 } else {
2025 format!("{} ({})", connector.id, connector.provider_id)
2026 }
2027}
2028
2029fn editable_field(
2030 id: impl Into<String>,
2031 label: impl Into<String>,
2032 json_pointer: impl Into<String>,
2033 value_type: impl Into<String>,
2034 required: bool,
2035 enum_values: &[&str],
2036) -> WorkflowBundleEditableField {
2037 WorkflowBundleEditableField {
2038 id: id.into(),
2039 label: label.into(),
2040 json_pointer: json_pointer.into(),
2041 value_type: value_type.into(),
2042 required,
2043 enum_values: enum_values
2044 .iter()
2045 .map(|value| (*value).to_string())
2046 .collect(),
2047 }
2048}
2049
2050fn json_pointer_segment(value: &str) -> String {
2051 value.replace('~', "~0").replace('/', "~1")
2052}
2053
2054fn trigger_editable_fields(
2055 index: usize,
2056 trigger: &WorkflowBundleTrigger,
2057) -> Vec<WorkflowBundleEditableField> {
2058 let base = format!("/triggers/{index}");
2059 let mut fields = vec![
2060 editable_field(
2061 format!("trigger.{}.kind", trigger.id),
2062 "Trigger kind",
2063 format!("{base}/kind"),
2064 "enum",
2065 true,
2066 &["github", "cron", "delay", "manual", "webhook", "mcp"],
2067 ),
2068 editable_field(
2069 format!("trigger.{}.node_id", trigger.id),
2070 "Target node",
2071 format!("{base}/node_id"),
2072 "string",
2073 false,
2074 &[],
2075 ),
2076 ];
2077 if trigger.provider.is_some() || trigger.kind == "github" {
2078 fields.push(editable_field(
2079 format!("trigger.{}.provider", trigger.id),
2080 "Provider",
2081 format!("{base}/provider"),
2082 "string",
2083 trigger.kind == "github",
2084 &[],
2085 ));
2086 }
2087 for (field, label, value_type) in [
2088 ("events", "Events", "list"),
2089 ("schedule", "Schedule", "string"),
2090 ("delay", "Delay", "string"),
2091 ("webhook_path", "Webhook path", "string"),
2092 ("mcp_tool", "MCP tool", "string"),
2093 ("resume_key", "Resume key", "string"),
2094 ("metadata", "Metadata", "object"),
2095 ] {
2096 fields.push(editable_field(
2097 format!("trigger.{}.{}", trigger.id, field),
2098 label,
2099 format!("{base}/{field}"),
2100 value_type,
2101 false,
2102 &[],
2103 ));
2104 }
2105 fields
2106}
2107
2108fn workflow_node_editable_fields(
2109 node_id: &str,
2110 capsule_id: Option<&String>,
2111) -> Vec<WorkflowBundleEditableField> {
2112 let escaped_node = json_pointer_segment(node_id);
2113 let mut fields = vec![
2114 editable_field(
2115 format!("workflow.{node_id}.task_label"),
2116 "Task label",
2117 format!("/workflow/nodes/{escaped_node}/task_label"),
2118 "string",
2119 false,
2120 &[],
2121 ),
2122 editable_field(
2123 format!("workflow.{node_id}.prompt"),
2124 "Prompt",
2125 format!("/workflow/nodes/{escaped_node}/prompt"),
2126 "string",
2127 false,
2128 &[],
2129 ),
2130 editable_field(
2131 format!("workflow.{node_id}.system"),
2132 "System prompt",
2133 format!("/workflow/nodes/{escaped_node}/system"),
2134 "string",
2135 false,
2136 &[],
2137 ),
2138 editable_field(
2139 format!("workflow.{node_id}.model_policy"),
2140 "Model policy",
2141 format!("/workflow/nodes/{escaped_node}/model_policy"),
2142 "object",
2143 false,
2144 &[],
2145 ),
2146 editable_field(
2147 format!("workflow.{node_id}.tools"),
2148 "Tool policy",
2149 format!("/workflow/nodes/{escaped_node}/tools"),
2150 "any",
2151 false,
2152 &[],
2153 ),
2154 editable_field(
2155 format!("workflow.{node_id}.capability_policy"),
2156 "Capability policy",
2157 format!("/workflow/nodes/{escaped_node}/capability_policy"),
2158 "object",
2159 false,
2160 &[],
2161 ),
2162 editable_field(
2163 format!("workflow.{node_id}.approval_policy"),
2164 "Approval policy",
2165 format!("/workflow/nodes/{escaped_node}/approval_policy"),
2166 "object",
2167 false,
2168 &[],
2169 ),
2170 editable_field(
2171 format!("workflow.{node_id}.retry_policy"),
2172 "Retry policy",
2173 format!("/workflow/nodes/{escaped_node}/retry_policy"),
2174 "object",
2175 false,
2176 &[],
2177 ),
2178 ];
2179 if let Some(capsule_id) = capsule_id {
2180 let escaped_capsule = json_pointer_segment(capsule_id);
2181 fields.extend([
2182 editable_field(
2183 format!("prompt_capsule.{capsule_id}.prompt"),
2184 "Prompt capsule",
2185 format!("/prompt_capsules/{escaped_capsule}/prompt"),
2186 "string",
2187 true,
2188 &[],
2189 ),
2190 editable_field(
2191 format!("prompt_capsule.{capsule_id}.system"),
2192 "Prompt capsule system",
2193 format!("/prompt_capsules/{escaped_capsule}/system"),
2194 "string",
2195 false,
2196 &[],
2197 ),
2198 editable_field(
2199 format!("prompt_capsule.{capsule_id}.context"),
2200 "Prompt capsule context",
2201 format!("/prompt_capsules/{escaped_capsule}/context"),
2202 "object",
2203 false,
2204 &[],
2205 ),
2206 editable_field(
2207 format!("prompt_capsule.{capsule_id}.trigger_id"),
2208 "Prompt capsule trigger",
2209 format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
2210 "string",
2211 false,
2212 &[],
2213 ),
2214 ]);
2215 }
2216 fields
2217}
2218
2219fn connector_editable_fields(
2220 index: usize,
2221 connector: &ConnectorRequirement,
2222) -> Vec<WorkflowBundleEditableField> {
2223 let base = format!("/connectors/{index}");
2224 [
2225 ("id", "Connector id", "string", true),
2226 ("provider_id", "Provider id", "string", true),
2227 ("scopes", "Scopes", "list", false),
2228 ("setup_required", "Setup required", "bool", false),
2229 ("status_required", "Status required", "bool", false),
2230 ]
2231 .into_iter()
2232 .map(|(field, label, value_type, required)| {
2233 editable_field(
2234 format!("connector.{}.{}", connector.id, field),
2235 label,
2236 format!("{base}/{field}"),
2237 value_type,
2238 required,
2239 &[],
2240 )
2241 })
2242 .collect()
2243}
2244
2245fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
2246 vec![
2247 editable_field(
2248 "policy.retry.max_attempts",
2249 "Retry attempts",
2250 "/policy/retry/max_attempts",
2251 "integer",
2252 true,
2253 &[],
2254 ),
2255 editable_field(
2256 "policy.retry.backoff",
2257 "Retry backoff",
2258 "/policy/retry/backoff",
2259 "string",
2260 true,
2261 &[],
2262 ),
2263 ]
2264}
2265
2266fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
2267 vec![
2268 editable_field(
2269 "policy.catchup.mode",
2270 "Catchup mode",
2271 "/policy/catchup/mode",
2272 "enum",
2273 true,
2274 &["none", "latest", "all"],
2275 ),
2276 editable_field(
2277 "policy.catchup.max_events",
2278 "Catchup max events",
2279 "/policy/catchup/max_events",
2280 "integer",
2281 false,
2282 &[],
2283 ),
2284 ]
2285}
2286
2287fn render_workflow_bundle_mermaid(
2288 nodes: &[WorkflowBundleGraphNode],
2289 edges: &[WorkflowBundleGraphEdge],
2290) -> String {
2291 let mut lines = vec!["flowchart TD".to_string()];
2292 for node in nodes {
2293 lines.push(format!(
2294 " {}[\"{}\"]",
2295 mermaid_id(&node.id),
2296 mermaid_label(&format!("{}: {}", node.node_type, node.label))
2297 ));
2298 }
2299 for edge in edges {
2300 let label = edge
2301 .label
2302 .as_deref()
2303 .or(edge.branch.as_deref())
2304 .map(mermaid_label);
2305 match label {
2306 Some(label) if !label.is_empty() => lines.push(format!(
2307 " {} -->|{}| {}",
2308 mermaid_id(&edge.from),
2309 label,
2310 mermaid_id(&edge.to)
2311 )),
2312 _ => lines.push(format!(
2313 " {} --> {}",
2314 mermaid_id(&edge.from),
2315 mermaid_id(&edge.to)
2316 )),
2317 }
2318 }
2319 lines.join("\n")
2320}
2321
2322fn mermaid_id(value: &str) -> String {
2323 let digest = Sha256::digest(value.as_bytes());
2324 let suffix = digest
2325 .iter()
2326 .take(4)
2327 .map(|byte| format!("{byte:02x}"))
2328 .collect::<String>();
2329 let mut out = format!("n_{suffix}_");
2330 for ch in value.chars() {
2331 if ch.is_ascii_alphanumeric() {
2332 out.push(ch);
2333 } else {
2334 out.push('_');
2335 }
2336 }
2337 out
2338}
2339
2340fn mermaid_label(value: &str) -> String {
2341 value
2342 .replace('\\', "\\\\")
2343 .replace('"', "\\\"")
2344 .replace('\n', " ")
2345}
2346
2347fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
2348 let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
2349 for trigger in &bundle.triggers {
2350 if let Some(node_id) = trigger.node_id.as_ref() {
2351 by_node
2352 .entry(node_id.clone())
2353 .or_default()
2354 .push(trigger.id.clone());
2355 }
2356 }
2357 by_node
2358}
2359
2360fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
2361 bundle
2362 .prompt_capsules
2363 .iter()
2364 .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
2365 .collect()
2366}
2367
2368fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
2369 let outgoing =
2370 graph
2371 .edges
2372 .iter()
2373 .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
2374 acc.entry(edge.from.clone())
2375 .or_default()
2376 .push(edge.to.clone());
2377 acc
2378 });
2379 let mut seen = BTreeSet::new();
2380 let mut queue = VecDeque::from([graph.entry.clone()]);
2381 let mut order = Vec::new();
2382 while let Some(node_id) = queue.pop_front() {
2383 if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
2384 continue;
2385 }
2386 order.push(node_id.clone());
2387 if let Some(next) = outgoing.get(&node_id) {
2388 let mut next = next.clone();
2389 next.sort();
2390 for child in next {
2391 queue.push_back(child);
2392 }
2393 }
2394 }
2395 order
2396}
2397
2398fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
2399 let suffix = graph_digest
2400 .strip_prefix("sha256:")
2401 .unwrap_or(graph_digest)
2402 .chars()
2403 .take(12)
2404 .collect::<String>();
2405 format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
2406}
2407
2408fn sanitize_id(value: &str) -> String {
2409 value
2410 .chars()
2411 .map(|ch| {
2412 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
2413 ch
2414 } else {
2415 '_'
2416 }
2417 })
2418 .collect()
2419}
2420
2421fn push_error(
2422 report: &mut WorkflowBundleValidationReport,
2423 path: impl Into<String>,
2424 message: impl Into<String>,
2425 node_id: Option<String>,
2426) {
2427 report.errors.push(WorkflowBundleDiagnostic {
2428 severity: "error".to_string(),
2429 path: path.into(),
2430 message: message.into(),
2431 node_id,
2432 });
2433}
2434
2435fn push_warning(
2436 report: &mut WorkflowBundleValidationReport,
2437 path: impl Into<String>,
2438 message: impl Into<String>,
2439 node_id: Option<String>,
2440) {
2441 report.warnings.push(WorkflowBundleDiagnostic {
2442 severity: "warning".to_string(),
2443 path: path.into(),
2444 message: message.into(),
2445 node_id,
2446 });
2447}
2448
2449#[cfg(test)]
2450#[path = "workflow_bundle_tests.rs"]
2451mod workflow_bundle_tests;