1use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::fs;
5use std::path::Path;
6
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
11
12pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 1;
13pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct WorkflowBundle {
18 pub schema_version: u32,
19 pub id: String,
20 pub name: Option<String>,
21 pub version: String,
22 pub triggers: Vec<WorkflowBundleTrigger>,
23 pub workflow: WorkflowGraph,
24 pub prompt_capsules: BTreeMap<String, PromptCapsule>,
25 pub policy: WorkflowBundlePolicy,
26 pub connectors: Vec<ConnectorRequirement>,
27 pub environment: EnvironmentRequirements,
28 pub receipts: WorkflowBundleReplayMetadata,
29 pub metadata: BTreeMap<String, serde_json::Value>,
30}
31
32#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(default)]
34pub struct WorkflowBundleTrigger {
35 pub id: String,
36 pub kind: String,
37 pub provider: Option<String>,
38 pub events: Vec<String>,
39 pub schedule: Option<String>,
40 pub delay: Option<String>,
41 pub webhook_path: Option<String>,
42 pub mcp_tool: Option<String>,
43 pub resume_key: Option<String>,
44 pub node_id: Option<String>,
45 pub metadata: BTreeMap<String, String>,
46}
47
48#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
49#[serde(default)]
50pub struct PromptCapsule {
51 pub id: String,
52 pub node_id: String,
53 pub trigger_id: Option<String>,
54 pub prompt: String,
55 pub system: Option<String>,
56 pub context: BTreeMap<String, serde_json::Value>,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
60#[serde(default)]
61pub struct WorkflowBundlePolicy {
62 pub autonomy_tier: String,
63 pub tool_policy: BTreeMap<String, serde_json::Value>,
64 pub approval_required: Vec<String>,
65 pub retry: RetryPolicySpec,
66 pub catchup: CatchupPolicySpec,
67}
68
69impl Default for WorkflowBundlePolicy {
70 fn default() -> Self {
71 Self {
72 autonomy_tier: "act_with_approval".to_string(),
73 tool_policy: BTreeMap::new(),
74 approval_required: Vec::new(),
75 retry: RetryPolicySpec::default(),
76 catchup: CatchupPolicySpec::default(),
77 }
78 }
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
82#[serde(default)]
83pub struct RetryPolicySpec {
84 pub max_attempts: u32,
85 pub backoff: String,
86}
87
88impl Default for RetryPolicySpec {
89 fn default() -> Self {
90 Self {
91 max_attempts: 1,
92 backoff: "none".to_string(),
93 }
94 }
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
98#[serde(default)]
99pub struct CatchupPolicySpec {
100 pub mode: String,
101 pub max_events: Option<u32>,
102}
103
104impl Default for CatchupPolicySpec {
105 fn default() -> Self {
106 Self {
107 mode: "latest".to_string(),
108 max_events: Some(1),
109 }
110 }
111}
112
113#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
114#[serde(default)]
115pub struct ConnectorRequirement {
116 pub id: String,
117 pub provider_id: String,
118 pub scopes: Vec<String>,
119 pub setup_required: bool,
120 pub status_required: bool,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct EnvironmentRequirements {
126 pub repo_setup_profile: Option<String>,
127 pub worktree_policy: String,
128 pub command_gates: Vec<String>,
129}
130
131impl Default for EnvironmentRequirements {
132 fn default() -> Self {
133 Self {
134 repo_setup_profile: None,
135 worktree_policy: "host_managed".to_string(),
136 command_gates: Vec::new(),
137 }
138 }
139}
140
141#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
142#[serde(default)]
143pub struct WorkflowBundleReplayMetadata {
144 pub run_id: Option<String>,
145 pub event_ids: Vec<String>,
146 pub workflow_version: Option<usize>,
147 pub graph_digest: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct WorkflowBundleDiagnostic {
153 pub severity: String,
154 pub path: String,
155 pub message: String,
156 pub node_id: Option<String>,
157}
158
159#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
160#[serde(default)]
161pub struct WorkflowBundleValidationReport {
162 pub valid: bool,
163 pub bundle_id: String,
164 pub workflow_id: String,
165 pub graph_digest: String,
166 pub errors: Vec<WorkflowBundleDiagnostic>,
167 pub warnings: Vec<WorkflowBundleDiagnostic>,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
171pub struct WorkflowBundlePreview {
172 pub schema_version: u32,
173 pub bundle_id: String,
174 pub bundle_version: String,
175 pub workflow_id: String,
176 pub workflow_version: usize,
177 pub graph_digest: String,
178 pub validation: WorkflowBundleValidationReport,
179 pub triggers: Vec<WorkflowBundleTrigger>,
180 pub connectors: Vec<ConnectorRequirement>,
181 pub environment: EnvironmentRequirements,
182 pub nodes: Vec<WorkflowBundlePreviewNode>,
183 pub edges: Vec<WorkflowEdge>,
184}
185
186#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
187pub struct WorkflowBundlePreviewNode {
188 pub id: String,
189 pub kind: String,
190 pub label: Option<String>,
191 pub prompt_capsule: Option<String>,
192 pub trigger_ids: Vec<String>,
193 pub outgoing: Vec<String>,
194}
195
196#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
197#[serde(default)]
198pub struct WorkflowBundleRunRequest {
199 pub trigger_id: Option<String>,
200 pub event_id: Option<String>,
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
204pub struct WorkflowBundleRunReceipt {
205 pub schema_version: u32,
206 pub receipt_type: String,
207 pub bundle_id: String,
208 pub bundle_version: String,
209 pub workflow_id: String,
210 pub workflow_version: usize,
211 pub graph_digest: String,
212 pub run_id: String,
213 pub trigger_id: Option<String>,
214 pub event_ids: Vec<String>,
215 pub status: String,
216 pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
217 pub policy: WorkflowBundlePolicy,
218 pub connectors: Vec<ConnectorRequirement>,
219 pub environment: EnvironmentRequirements,
220}
221
222#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub struct WorkflowBundleRunNodeReceipt {
224 pub node_id: String,
225 pub kind: String,
226 pub prompt_capsule: Option<String>,
227 pub status: String,
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
231pub struct WorkflowBundleError {
232 pub message: String,
233}
234
235impl std::fmt::Display for WorkflowBundleError {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 self.message.fmt(f)
238 }
239}
240
241impl std::error::Error for WorkflowBundleError {}
242
243impl From<std::io::Error> for WorkflowBundleError {
244 fn from(error: std::io::Error) -> Self {
245 Self {
246 message: error.to_string(),
247 }
248 }
249}
250
251impl From<serde_json::Error> for WorkflowBundleError {
252 fn from(error: serde_json::Error) -> Self {
253 Self {
254 message: error.to_string(),
255 }
256 }
257}
258
259pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
260 let bytes = fs::read(path)?;
261 serde_json::from_slice(&bytes).map_err(Into::into)
262}
263
264pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
265 let mut canonical = canonical_workflow_graph(graph);
266 canonical.audit_log.clear();
267 let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
268 let digest = Sha256::digest(bytes);
269 let hex = digest
270 .iter()
271 .map(|byte| format!("{byte:02x}"))
272 .collect::<String>();
273 format!("sha256:{hex}")
274}
275
276pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
277 let canonical = canonical_workflow_graph(&bundle.workflow);
278 let mut report = WorkflowBundleValidationReport {
279 valid: true,
280 bundle_id: bundle.id.clone(),
281 workflow_id: canonical.id.clone(),
282 graph_digest: workflow_graph_digest(&canonical),
283 errors: Vec::new(),
284 warnings: Vec::new(),
285 };
286
287 validate_bundle_identity(bundle, &canonical, &mut report);
288 validate_triggers(bundle, &canonical, &mut report);
289 validate_prompt_capsules(bundle, &canonical, &mut report);
290 validate_policy(bundle, &mut report);
291 validate_connectors(bundle, &mut report);
292 validate_environment(bundle, &mut report);
293
294 let graph_report = validate_workflow(&canonical, None);
295 for error in graph_report.errors {
296 push_error(&mut report, "workflow", error, None);
297 }
298 for warning in graph_report.warnings {
299 push_warning(&mut report, "workflow", warning, None);
300 }
301
302 if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
303 if expected != report.graph_digest {
304 let actual = report.graph_digest.clone();
305 push_error(
306 &mut report,
307 "receipts.graph_digest",
308 format!("graph digest mismatch: expected {expected}, computed {actual}"),
309 None,
310 );
311 }
312 }
313 if let Some(expected_version) = bundle.receipts.workflow_version {
314 if expected_version != canonical.version {
315 push_error(
316 &mut report,
317 "receipts.workflow_version",
318 format!(
319 "workflow version mismatch: expected {expected_version}, computed {}",
320 canonical.version
321 ),
322 None,
323 );
324 }
325 }
326
327 report.valid = report.errors.is_empty();
328 report
329}
330
331pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
332 let canonical = canonical_workflow_graph(&bundle.workflow);
333 let validation = validate_workflow_bundle(bundle);
334 let triggers_by_node = triggers_by_node(bundle);
335 let capsules_by_node = capsules_by_node(bundle);
336 let mut nodes = Vec::new();
337
338 for (node_id, node) in &canonical.nodes {
339 let mut outgoing = canonical
340 .edges
341 .iter()
342 .filter(|edge| edge.from == *node_id)
343 .map(|edge| edge.to.clone())
344 .collect::<Vec<_>>();
345 outgoing.sort();
346 outgoing.dedup();
347 nodes.push(WorkflowBundlePreviewNode {
348 id: node_id.clone(),
349 kind: node.kind.clone(),
350 label: node.task_label.clone(),
351 prompt_capsule: capsules_by_node.get(node_id).cloned(),
352 trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
353 outgoing,
354 });
355 }
356
357 WorkflowBundlePreview {
358 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
359 bundle_id: bundle.id.clone(),
360 bundle_version: bundle.version.clone(),
361 workflow_id: canonical.id.clone(),
362 workflow_version: canonical.version,
363 graph_digest: validation.graph_digest.clone(),
364 validation,
365 triggers: bundle.triggers.clone(),
366 connectors: bundle.connectors.clone(),
367 environment: bundle.environment.clone(),
368 nodes,
369 edges: sorted_edges(&canonical),
370 }
371}
372
373pub fn run_workflow_bundle(
374 bundle: &WorkflowBundle,
375 request: WorkflowBundleRunRequest,
376) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
377 let validation = validate_workflow_bundle(bundle);
378 if !validation.valid {
379 return Err(validation);
380 }
381
382 let canonical = canonical_workflow_graph(&bundle.workflow);
383 let trigger_id = match request.trigger_id {
384 Some(trigger_id)
385 if !bundle
386 .triggers
387 .iter()
388 .any(|trigger| trigger.id == trigger_id) =>
389 {
390 let mut report = validation;
391 push_error(
392 &mut report,
393 "trigger_id",
394 format!("unknown trigger id: {trigger_id}"),
395 None,
396 );
397 report.valid = false;
398 return Err(report);
399 }
400 Some(trigger_id) => Some(trigger_id),
401 None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
402 };
403 let mut event_ids = bundle.receipts.event_ids.clone();
404 if let Some(event_id) = request.event_id {
405 if !event_ids.contains(&event_id) {
406 event_ids.push(event_id);
407 }
408 }
409 let run_id = bundle
410 .receipts
411 .run_id
412 .clone()
413 .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
414 let capsules_by_node = capsules_by_node(bundle);
415 let executed_nodes = execution_order(&canonical)
416 .into_iter()
417 .map(|node_id| {
418 let node = canonical
419 .nodes
420 .get(&node_id)
421 .expect("execution order only contains known nodes");
422 WorkflowBundleRunNodeReceipt {
423 node_id: node_id.clone(),
424 kind: node.kind.clone(),
425 prompt_capsule: capsules_by_node.get(&node_id).cloned(),
426 status: "completed".to_string(),
427 }
428 })
429 .collect();
430
431 Ok(WorkflowBundleRunReceipt {
432 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
433 receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
434 bundle_id: bundle.id.clone(),
435 bundle_version: bundle.version.clone(),
436 workflow_id: canonical.id,
437 workflow_version: canonical.version,
438 graph_digest: validation.graph_digest,
439 run_id,
440 trigger_id,
441 event_ids,
442 status: "completed".to_string(),
443 executed_nodes,
444 policy: bundle.policy.clone(),
445 connectors: bundle.connectors.clone(),
446 environment: bundle.environment.clone(),
447 })
448}
449
450fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
451 let mut canonical = graph.clone();
452 if canonical.type_name.is_empty() {
453 canonical.type_name = "workflow_graph".to_string();
454 }
455 if canonical.version == 0 {
456 canonical.version = 1;
457 }
458 if canonical.entry.is_empty() {
459 canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
460 }
461 for (node_id, node) in &mut canonical.nodes {
462 if node.id.is_none() {
463 node.id = Some(node_id.clone());
464 }
465 if node.kind.is_empty() {
466 node.kind = "stage".to_string();
467 }
468 if node.retry_policy.max_attempts == 0 {
469 node.retry_policy.max_attempts = 1;
470 }
471 }
472 canonical.edges = sorted_edges(&canonical);
473 canonical
474}
475
476fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
477 let mut edges = graph.edges.clone();
478 edges.sort_by(|left, right| {
479 (
480 &left.from,
481 &left.to,
482 left.branch.as_deref(),
483 left.label.as_deref(),
484 )
485 .cmp(&(
486 &right.from,
487 &right.to,
488 right.branch.as_deref(),
489 right.label.as_deref(),
490 ))
491 });
492 edges
493}
494
495fn validate_bundle_identity(
496 bundle: &WorkflowBundle,
497 graph: &WorkflowGraph,
498 report: &mut WorkflowBundleValidationReport,
499) {
500 if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
501 push_error(
502 report,
503 "schema_version",
504 format!(
505 "unsupported schema_version {}; expected {}",
506 bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
507 ),
508 None,
509 );
510 }
511 if bundle.id.trim().is_empty() {
512 push_error(report, "id", "bundle id is required", None);
513 }
514 if bundle.version.trim().is_empty() {
515 push_error(report, "version", "bundle version is required", None);
516 }
517 if graph.id.trim().is_empty() {
518 push_error(
519 report,
520 "workflow.id",
521 "workflow id is required for portable bundles",
522 None,
523 );
524 }
525 if graph.nodes.is_empty() {
526 push_error(
527 report,
528 "workflow.nodes",
529 "workflow must contain nodes",
530 None,
531 );
532 }
533 for (node_id, node) in &graph.nodes {
534 if node_id.trim().is_empty() {
535 push_error(report, "workflow.nodes", "node id is required", None);
536 }
537 if node.id.as_deref().is_some_and(|id| id != node_id) {
538 push_error(
539 report,
540 format!("workflow.nodes.{node_id}.id"),
541 "node id field must match its map key",
542 Some(node_id.clone()),
543 );
544 }
545 }
546}
547
548fn validate_triggers(
549 bundle: &WorkflowBundle,
550 graph: &WorkflowGraph,
551 report: &mut WorkflowBundleValidationReport,
552) {
553 if bundle.triggers.is_empty() {
554 push_warning(report, "triggers", "bundle declares no triggers", None);
555 }
556 let mut ids = BTreeSet::new();
557 for (index, trigger) in bundle.triggers.iter().enumerate() {
558 let path = format!("triggers[{index}]");
559 if trigger.id.trim().is_empty() {
560 push_error(report, format!("{path}.id"), "trigger id is required", None);
561 } else if !ids.insert(trigger.id.clone()) {
562 push_error(
563 report,
564 format!("{path}.id"),
565 format!("duplicate trigger id: {}", trigger.id),
566 None,
567 );
568 }
569 match trigger.kind.as_str() {
570 "github" => {
571 if trigger.provider.as_deref() != Some("github") {
572 push_error(
573 report,
574 format!("{path}.provider"),
575 "github triggers require provider=\"github\"",
576 None,
577 );
578 }
579 if trigger.events.is_empty() {
580 push_error(
581 report,
582 format!("{path}.events"),
583 "github triggers require at least one event",
584 None,
585 );
586 }
587 }
588 "cron" if trigger.schedule.is_none() => push_error(
589 report,
590 format!("{path}.schedule"),
591 "cron triggers require schedule",
592 None,
593 ),
594 "delay" if trigger.delay.is_none() => push_error(
595 report,
596 format!("{path}.delay"),
597 "delay triggers require delay",
598 None,
599 ),
600 "webhook" if trigger.webhook_path.is_none() => push_error(
601 report,
602 format!("{path}.webhook_path"),
603 "webhook triggers require webhook_path",
604 None,
605 ),
606 "mcp" if trigger.mcp_tool.is_none() => push_error(
607 report,
608 format!("{path}.mcp_tool"),
609 "mcp triggers require mcp_tool",
610 None,
611 ),
612 "manual" => {}
613 "" => push_error(
614 report,
615 format!("{path}.kind"),
616 "trigger kind is required",
617 None,
618 ),
619 other
620 if !matches!(
621 other,
622 "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
623 ) =>
624 {
625 push_error(
626 report,
627 format!("{path}.kind"),
628 format!("unsupported trigger kind: {other}"),
629 None,
630 );
631 }
632 _ => {}
633 }
634 if let Some(node_id) = trigger.node_id.as_deref() {
635 if !graph.nodes.contains_key(node_id) {
636 push_error(
637 report,
638 format!("{path}.node_id"),
639 format!("trigger references unknown node: {node_id}"),
640 Some(node_id.to_string()),
641 );
642 }
643 }
644 }
645}
646
647fn validate_prompt_capsules(
648 bundle: &WorkflowBundle,
649 graph: &WorkflowGraph,
650 report: &mut WorkflowBundleValidationReport,
651) {
652 let trigger_ids: BTreeSet<&str> = bundle
653 .triggers
654 .iter()
655 .map(|trigger| trigger.id.as_str())
656 .collect();
657 let mut node_refs = BTreeSet::new();
658 for (key, capsule) in &bundle.prompt_capsules {
659 let path = format!("prompt_capsules.{key}");
660 if capsule.id.trim().is_empty() {
661 push_error(
662 report,
663 format!("{path}.id"),
664 "prompt capsule id is required",
665 None,
666 );
667 } else if capsule.id != *key {
668 push_error(
669 report,
670 format!("{path}.id"),
671 "prompt capsule id must match its map key",
672 None,
673 );
674 }
675 if capsule.prompt.trim().is_empty() {
676 push_error(
677 report,
678 format!("{path}.prompt"),
679 "prompt capsule prompt is required",
680 Some(capsule.node_id.clone()),
681 );
682 }
683 if !graph.nodes.contains_key(&capsule.node_id) {
684 push_error(
685 report,
686 format!("{path}.node_id"),
687 format!(
688 "prompt capsule references unknown node: {}",
689 capsule.node_id
690 ),
691 Some(capsule.node_id.clone()),
692 );
693 }
694 if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
695 push_error(
696 report,
697 format!("{path}.node_id"),
698 format!("multiple prompt capsules target node {}", capsule.node_id),
699 Some(capsule.node_id.clone()),
700 );
701 }
702 if let Some(trigger_id) = capsule.trigger_id.as_deref() {
703 if !trigger_ids.contains(trigger_id) {
704 push_error(
705 report,
706 format!("{path}.trigger_id"),
707 format!("prompt capsule references unknown trigger: {trigger_id}"),
708 Some(capsule.node_id.clone()),
709 );
710 }
711 }
712 }
713}
714
715fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
716 if !matches!(
717 bundle.policy.autonomy_tier.as_str(),
718 "shadow" | "suggest" | "act_with_approval" | "act_auto"
719 ) {
720 push_error(
721 report,
722 "policy.autonomy_tier",
723 "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
724 None,
725 );
726 }
727 if bundle.policy.retry.max_attempts == 0 {
728 push_error(
729 report,
730 "policy.retry.max_attempts",
731 "retry.max_attempts must be at least 1",
732 None,
733 );
734 }
735 if !matches!(
736 bundle.policy.catchup.mode.as_str(),
737 "none" | "latest" | "all"
738 ) {
739 push_error(
740 report,
741 "policy.catchup.mode",
742 "catchup.mode must be none, latest, or all",
743 None,
744 );
745 }
746}
747
748fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
749 let mut ids = BTreeSet::new();
750 let provider_ids: BTreeSet<&str> = bundle
751 .connectors
752 .iter()
753 .map(|connector| connector.provider_id.as_str())
754 .collect();
755 for (index, connector) in bundle.connectors.iter().enumerate() {
756 let path = format!("connectors[{index}]");
757 if connector.id.trim().is_empty() {
758 push_error(
759 report,
760 format!("{path}.id"),
761 "connector id is required",
762 None,
763 );
764 } else if !ids.insert(connector.id.clone()) {
765 push_error(
766 report,
767 format!("{path}.id"),
768 format!("duplicate connector id: {}", connector.id),
769 None,
770 );
771 }
772 if connector.provider_id.trim().is_empty() {
773 push_error(
774 report,
775 format!("{path}.provider_id"),
776 "connector provider_id is required",
777 None,
778 );
779 }
780 }
781 for trigger in &bundle.triggers {
782 if let Some(provider) = trigger.provider.as_deref() {
783 if !provider_ids.contains(provider) {
784 push_warning(
785 report,
786 "connectors",
787 format!(
788 "trigger {} references provider {provider} with no connector requirement",
789 trigger.id
790 ),
791 trigger.node_id.clone(),
792 );
793 }
794 }
795 }
796}
797
798fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
799 if !matches!(
800 bundle.environment.worktree_policy.as_str(),
801 "reuse_current" | "new_worktree" | "host_managed"
802 ) {
803 push_error(
804 report,
805 "environment.worktree_policy",
806 "worktree_policy must be reuse_current, new_worktree, or host_managed",
807 None,
808 );
809 }
810}
811
812fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
813 let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
814 for trigger in &bundle.triggers {
815 if let Some(node_id) = trigger.node_id.as_ref() {
816 by_node
817 .entry(node_id.clone())
818 .or_default()
819 .push(trigger.id.clone());
820 }
821 }
822 by_node
823}
824
825fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
826 bundle
827 .prompt_capsules
828 .iter()
829 .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
830 .collect()
831}
832
833fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
834 let outgoing =
835 graph
836 .edges
837 .iter()
838 .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
839 acc.entry(edge.from.clone())
840 .or_default()
841 .push(edge.to.clone());
842 acc
843 });
844 let mut seen = BTreeSet::new();
845 let mut queue = VecDeque::from([graph.entry.clone()]);
846 let mut order = Vec::new();
847 while let Some(node_id) = queue.pop_front() {
848 if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
849 continue;
850 }
851 order.push(node_id.clone());
852 if let Some(next) = outgoing.get(&node_id) {
853 let mut next = next.clone();
854 next.sort();
855 for child in next {
856 queue.push_back(child);
857 }
858 }
859 }
860 order
861}
862
863fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
864 let suffix = graph_digest
865 .strip_prefix("sha256:")
866 .unwrap_or(graph_digest)
867 .chars()
868 .take(12)
869 .collect::<String>();
870 format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
871}
872
873fn sanitize_id(value: &str) -> String {
874 value
875 .chars()
876 .map(|ch| {
877 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
878 ch
879 } else {
880 '_'
881 }
882 })
883 .collect()
884}
885
886fn push_error(
887 report: &mut WorkflowBundleValidationReport,
888 path: impl Into<String>,
889 message: impl Into<String>,
890 node_id: Option<String>,
891) {
892 report.errors.push(WorkflowBundleDiagnostic {
893 severity: "error".to_string(),
894 path: path.into(),
895 message: message.into(),
896 node_id,
897 });
898}
899
900fn push_warning(
901 report: &mut WorkflowBundleValidationReport,
902 path: impl Into<String>,
903 message: impl Into<String>,
904 node_id: Option<String>,
905) {
906 report.warnings.push(WorkflowBundleDiagnostic {
907 severity: "warning".to_string(),
908 path: path.into(),
909 message: message.into(),
910 node_id,
911 });
912}
913
914#[cfg(test)]
915#[path = "workflow_bundle_tests.rs"]
916mod workflow_bundle_tests;