1use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::fs;
5use std::path::Path;
6
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10use super::{validate_workflow, WorkflowEdge, WorkflowGraph};
11
12pub const WORKFLOW_BUNDLE_SCHEMA_VERSION: u32 = 1;
13pub const WORKFLOW_BUNDLE_RECEIPT_TYPE: &str = "harn.workflow_bundle.run";
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct WorkflowBundle {
18 pub schema_version: u32,
19 pub id: String,
20 pub name: Option<String>,
21 pub version: String,
22 pub triggers: Vec<WorkflowBundleTrigger>,
23 pub workflow: WorkflowGraph,
24 pub prompt_capsules: BTreeMap<String, PromptCapsule>,
25 pub policy: WorkflowBundlePolicy,
26 pub connectors: Vec<ConnectorRequirement>,
27 pub environment: EnvironmentRequirements,
28 pub receipts: WorkflowBundleReplayMetadata,
29 pub metadata: BTreeMap<String, serde_json::Value>,
30}
31
32#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(default)]
34pub struct WorkflowBundleTrigger {
35 pub id: String,
36 pub kind: String,
37 pub provider: Option<String>,
38 pub events: Vec<String>,
39 pub schedule: Option<String>,
40 pub delay: Option<String>,
41 pub webhook_path: Option<String>,
42 pub mcp_tool: Option<String>,
43 pub resume_key: Option<String>,
44 pub node_id: Option<String>,
45 pub metadata: BTreeMap<String, String>,
46}
47
48#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
49#[serde(default)]
50pub struct PromptCapsule {
51 pub id: String,
52 pub node_id: String,
53 pub trigger_id: Option<String>,
54 pub prompt: String,
55 pub system: Option<String>,
56 pub context: BTreeMap<String, serde_json::Value>,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
60#[serde(default)]
61pub struct WorkflowBundlePolicy {
62 pub autonomy_tier: String,
63 pub tool_policy: BTreeMap<String, serde_json::Value>,
64 pub approval_required: Vec<String>,
65 pub retry: RetryPolicySpec,
66 pub catchup: CatchupPolicySpec,
67}
68
69impl Default for WorkflowBundlePolicy {
70 fn default() -> Self {
71 Self {
72 autonomy_tier: "act_with_approval".to_string(),
73 tool_policy: BTreeMap::new(),
74 approval_required: Vec::new(),
75 retry: RetryPolicySpec::default(),
76 catchup: CatchupPolicySpec::default(),
77 }
78 }
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
82#[serde(default)]
83pub struct RetryPolicySpec {
84 pub max_attempts: u32,
85 pub backoff: String,
86}
87
88impl Default for RetryPolicySpec {
89 fn default() -> Self {
90 Self {
91 max_attempts: 1,
92 backoff: "none".to_string(),
93 }
94 }
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
98#[serde(default)]
99pub struct CatchupPolicySpec {
100 pub mode: String,
101 pub max_events: Option<u32>,
102}
103
104impl Default for CatchupPolicySpec {
105 fn default() -> Self {
106 Self {
107 mode: "latest".to_string(),
108 max_events: Some(1),
109 }
110 }
111}
112
113#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
114#[serde(default)]
115pub struct ConnectorRequirement {
116 pub id: String,
117 pub provider_id: String,
118 pub scopes: Vec<String>,
119 pub setup_required: bool,
120 pub status_required: bool,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct EnvironmentRequirements {
126 pub repo_setup_profile: Option<String>,
127 pub worktree_policy: String,
128 pub command_gates: Vec<String>,
129}
130
131impl Default for EnvironmentRequirements {
132 fn default() -> Self {
133 Self {
134 repo_setup_profile: None,
135 worktree_policy: "host_managed".to_string(),
136 command_gates: Vec::new(),
137 }
138 }
139}
140
141#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
142#[serde(default)]
143pub struct WorkflowBundleReplayMetadata {
144 pub run_id: Option<String>,
145 pub event_ids: Vec<String>,
146 pub workflow_version: Option<usize>,
147 pub graph_digest: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct WorkflowBundleDiagnostic {
153 pub severity: String,
154 pub path: String,
155 pub message: String,
156 pub node_id: Option<String>,
157}
158
159#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
160#[serde(default)]
161pub struct WorkflowBundleValidationReport {
162 pub valid: bool,
163 pub bundle_id: String,
164 pub workflow_id: String,
165 pub graph_digest: String,
166 pub errors: Vec<WorkflowBundleDiagnostic>,
167 pub warnings: Vec<WorkflowBundleDiagnostic>,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
171pub struct WorkflowBundlePreview {
172 pub schema_version: u32,
173 pub bundle_id: String,
174 pub bundle_version: String,
175 pub workflow_id: String,
176 pub workflow_version: usize,
177 pub graph_digest: String,
178 pub validation: WorkflowBundleValidationReport,
179 pub graph: WorkflowBundleGraphExport,
180 pub mermaid: String,
181 pub triggers: Vec<WorkflowBundleTrigger>,
182 pub connectors: Vec<ConnectorRequirement>,
183 pub environment: EnvironmentRequirements,
184 pub nodes: Vec<WorkflowBundlePreviewNode>,
185 pub edges: Vec<WorkflowEdge>,
186}
187
188#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
189pub struct WorkflowBundlePreviewNode {
190 pub id: String,
191 pub kind: String,
192 pub label: Option<String>,
193 pub prompt_capsule: Option<String>,
194 pub trigger_ids: Vec<String>,
195 pub outgoing: Vec<String>,
196}
197
198#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
199pub struct WorkflowBundleGraphExport {
200 pub schema_version: u32,
201 pub graph_id: String,
202 pub graph_digest: String,
203 pub nodes: Vec<WorkflowBundleGraphNode>,
204 pub edges: Vec<WorkflowBundleGraphEdge>,
205 pub diagnostics: Vec<WorkflowBundleGraphDiagnostic>,
206 pub editable_fields: Vec<WorkflowBundleEditableField>,
207 pub mermaid: String,
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
211pub struct WorkflowBundleGraphNode {
212 pub id: String,
213 pub node_type: String,
214 pub label: String,
215 pub workflow_node_id: Option<String>,
216 pub trigger_id: Option<String>,
217 pub connector_id: Option<String>,
218 pub editable_fields: Vec<WorkflowBundleEditableField>,
219 pub metadata: BTreeMap<String, serde_json::Value>,
220}
221
222#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub struct WorkflowBundleGraphEdge {
224 pub from: String,
225 pub to: String,
226 pub label: Option<String>,
227 pub branch: Option<String>,
228}
229
230#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
231pub struct WorkflowBundleGraphDiagnostic {
232 pub severity: String,
233 pub path: String,
234 pub message: String,
235 pub node_id: Option<String>,
236 pub graph_node_id: Option<String>,
237}
238
239#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
240pub struct WorkflowBundleEditableField {
241 pub id: String,
242 pub label: String,
243 pub json_pointer: String,
244 pub value_type: String,
245 pub required: bool,
246 pub enum_values: Vec<String>,
247}
248
249#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
250#[serde(default)]
251pub struct WorkflowBundleRunRequest {
252 pub trigger_id: Option<String>,
253 pub event_id: Option<String>,
254}
255
256#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
257pub struct WorkflowBundleRunReceipt {
258 pub schema_version: u32,
259 pub receipt_type: String,
260 pub bundle_id: String,
261 pub bundle_version: String,
262 pub workflow_id: String,
263 pub workflow_version: usize,
264 pub graph_digest: String,
265 pub run_id: String,
266 pub trigger_id: Option<String>,
267 pub event_ids: Vec<String>,
268 pub status: String,
269 pub executed_nodes: Vec<WorkflowBundleRunNodeReceipt>,
270 pub policy: WorkflowBundlePolicy,
271 pub connectors: Vec<ConnectorRequirement>,
272 pub environment: EnvironmentRequirements,
273}
274
275#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
276pub struct WorkflowBundleRunNodeReceipt {
277 pub node_id: String,
278 pub kind: String,
279 pub prompt_capsule: Option<String>,
280 pub status: String,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq)]
284pub struct WorkflowBundleError {
285 pub message: String,
286}
287
288impl std::fmt::Display for WorkflowBundleError {
289 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290 self.message.fmt(f)
291 }
292}
293
294impl std::error::Error for WorkflowBundleError {}
295
296impl From<std::io::Error> for WorkflowBundleError {
297 fn from(error: std::io::Error) -> Self {
298 Self {
299 message: error.to_string(),
300 }
301 }
302}
303
304impl From<serde_json::Error> for WorkflowBundleError {
305 fn from(error: serde_json::Error) -> Self {
306 Self {
307 message: error.to_string(),
308 }
309 }
310}
311
312pub fn load_workflow_bundle(path: &Path) -> Result<WorkflowBundle, WorkflowBundleError> {
313 let bytes = fs::read(path)?;
314 serde_json::from_slice(&bytes).map_err(Into::into)
315}
316
317pub fn workflow_graph_digest(graph: &WorkflowGraph) -> String {
318 let mut canonical = canonical_workflow_graph(graph);
319 canonical.audit_log.clear();
320 let bytes = serde_json::to_vec(&canonical).expect("workflow graph serializes");
321 let digest = Sha256::digest(bytes);
322 let hex = digest
323 .iter()
324 .map(|byte| format!("{byte:02x}"))
325 .collect::<String>();
326 format!("sha256:{hex}")
327}
328
329pub fn validate_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundleValidationReport {
330 let canonical = canonical_workflow_graph(&bundle.workflow);
331 let mut report = WorkflowBundleValidationReport {
332 valid: true,
333 bundle_id: bundle.id.clone(),
334 workflow_id: canonical.id.clone(),
335 graph_digest: workflow_graph_digest(&canonical),
336 errors: Vec::new(),
337 warnings: Vec::new(),
338 };
339
340 validate_bundle_identity(bundle, &canonical, &mut report);
341 validate_triggers(bundle, &canonical, &mut report);
342 validate_prompt_capsules(bundle, &canonical, &mut report);
343 validate_policy(bundle, &mut report);
344 validate_connectors(bundle, &mut report);
345 validate_environment(bundle, &mut report);
346
347 let graph_report = validate_workflow(&canonical, None);
348 for error in graph_report.errors {
349 let node_id = workflow_diagnostic_node_id(&error, &canonical);
350 push_error(&mut report, "workflow", error, node_id);
351 }
352 for warning in graph_report.warnings {
353 let node_id = workflow_diagnostic_node_id(&warning, &canonical);
354 push_warning(&mut report, "workflow", warning, node_id);
355 }
356
357 if let Some(expected) = bundle.receipts.graph_digest.as_deref() {
358 if expected != report.graph_digest {
359 let actual = report.graph_digest.clone();
360 push_error(
361 &mut report,
362 "receipts.graph_digest",
363 format!("graph digest mismatch: expected {expected}, computed {actual}"),
364 None,
365 );
366 }
367 }
368 if let Some(expected_version) = bundle.receipts.workflow_version {
369 if expected_version != canonical.version {
370 push_error(
371 &mut report,
372 "receipts.workflow_version",
373 format!(
374 "workflow version mismatch: expected {expected_version}, computed {}",
375 canonical.version
376 ),
377 None,
378 );
379 }
380 }
381
382 report.valid = report.errors.is_empty();
383 report
384}
385
386pub fn preview_workflow_bundle(bundle: &WorkflowBundle) -> WorkflowBundlePreview {
387 let canonical = canonical_workflow_graph(&bundle.workflow);
388 let validation = validate_workflow_bundle(bundle);
389 let graph = export_workflow_bundle_graph(bundle, &validation);
390 let mermaid = graph.mermaid.clone();
391 let triggers_by_node = triggers_by_node(bundle);
392 let capsules_by_node = capsules_by_node(bundle);
393 let mut nodes = Vec::new();
394
395 for (node_id, node) in &canonical.nodes {
396 let mut outgoing = canonical
397 .edges
398 .iter()
399 .filter(|edge| edge.from == *node_id)
400 .map(|edge| edge.to.clone())
401 .collect::<Vec<_>>();
402 outgoing.sort();
403 outgoing.dedup();
404 nodes.push(WorkflowBundlePreviewNode {
405 id: node_id.clone(),
406 kind: node.kind.clone(),
407 label: node.task_label.clone(),
408 prompt_capsule: capsules_by_node.get(node_id).cloned(),
409 trigger_ids: triggers_by_node.get(node_id).cloned().unwrap_or_default(),
410 outgoing,
411 });
412 }
413
414 WorkflowBundlePreview {
415 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
416 bundle_id: bundle.id.clone(),
417 bundle_version: bundle.version.clone(),
418 workflow_id: canonical.id.clone(),
419 workflow_version: canonical.version,
420 graph_digest: validation.graph_digest.clone(),
421 validation,
422 graph,
423 mermaid,
424 triggers: bundle.triggers.clone(),
425 connectors: bundle.connectors.clone(),
426 environment: bundle.environment.clone(),
427 nodes,
428 edges: sorted_edges(&canonical),
429 }
430}
431
432pub fn export_workflow_bundle_graph(
433 bundle: &WorkflowBundle,
434 validation: &WorkflowBundleValidationReport,
435) -> WorkflowBundleGraphExport {
436 let canonical = canonical_workflow_graph(&bundle.workflow);
437 let mut nodes = Vec::new();
438 let mut edges = Vec::new();
439 let mut editable_fields = Vec::new();
440 let capsules_by_node = capsules_by_node(bundle);
441 let catchup_enabled = bundle.policy.catchup.mode != "none";
442 let retry_can_dlq = bundle.policy.retry.max_attempts > 1;
443
444 for (index, connector) in bundle.connectors.iter().enumerate() {
445 let node_fields = connector_editable_fields(index, connector);
446 editable_fields.extend(node_fields.clone());
447 nodes.push(WorkflowBundleGraphNode {
448 id: connector_graph_id(&connector.id),
449 node_type: "connector_call".to_string(),
450 label: connector_label(connector),
451 workflow_node_id: None,
452 trigger_id: None,
453 connector_id: Some(connector.id.clone()),
454 editable_fields: node_fields,
455 metadata: BTreeMap::from([
456 (
457 "provider_id".to_string(),
458 serde_json::json!(connector.provider_id),
459 ),
460 ("scopes".to_string(), serde_json::json!(connector.scopes)),
461 ]),
462 });
463 }
464
465 let catchup_fields = catchup_editable_fields();
466 let retry_fields = retry_editable_fields();
467 if catchup_enabled {
468 let node_fields = catchup_fields.clone();
469 editable_fields.extend(node_fields.clone());
470 nodes.push(WorkflowBundleGraphNode {
471 id: catchup_graph_id(),
472 node_type: "catchup".to_string(),
473 label: "Catch up".to_string(),
474 workflow_node_id: None,
475 trigger_id: None,
476 connector_id: None,
477 editable_fields: node_fields,
478 metadata: BTreeMap::from([(
479 "mode".to_string(),
480 serde_json::json!(bundle.policy.catchup.mode),
481 )]),
482 });
483 } else {
484 editable_fields.extend(catchup_fields);
485 }
486 if retry_can_dlq {
487 let node_fields = retry_fields.clone();
488 editable_fields.extend(node_fields.clone());
489 nodes.push(WorkflowBundleGraphNode {
490 id: dlq_graph_id(),
491 node_type: "dlq".to_string(),
492 label: "Dead letter queue".to_string(),
493 workflow_node_id: None,
494 trigger_id: None,
495 connector_id: None,
496 editable_fields: node_fields,
497 metadata: BTreeMap::from([(
498 "max_attempts".to_string(),
499 serde_json::json!(bundle.policy.retry.max_attempts),
500 )]),
501 });
502 } else {
503 editable_fields.extend(retry_fields);
504 }
505
506 for (index, trigger) in bundle.triggers.iter().enumerate() {
507 let node_fields = trigger_editable_fields(index, trigger);
508 editable_fields.extend(node_fields.clone());
509 nodes.push(WorkflowBundleGraphNode {
510 id: trigger_graph_id(&trigger.id),
511 node_type: "trigger".to_string(),
512 label: trigger_label(trigger),
513 workflow_node_id: trigger.node_id.clone(),
514 trigger_id: Some(trigger.id.clone()),
515 connector_id: None,
516 editable_fields: node_fields,
517 metadata: BTreeMap::from([
518 ("kind".to_string(), serde_json::json!(trigger.kind)),
519 ("provider".to_string(), serde_json::json!(trigger.provider)),
520 ("events".to_string(), serde_json::json!(trigger.events)),
521 ]),
522 });
523 if let Some(provider) = trigger.provider.as_deref() {
524 if let Some(connector) = bundle
525 .connectors
526 .iter()
527 .find(|connector| connector.provider_id == provider || connector.id == provider)
528 {
529 edges.push(WorkflowBundleGraphEdge {
530 from: connector_graph_id(&connector.id),
531 to: trigger_graph_id(&trigger.id),
532 label: Some("binds".to_string()),
533 branch: None,
534 });
535 }
536 }
537 let target = trigger
538 .node_id
539 .clone()
540 .unwrap_or_else(|| canonical.entry.clone());
541 if catchup_enabled {
542 edges.push(WorkflowBundleGraphEdge {
543 from: trigger_graph_id(&trigger.id),
544 to: catchup_graph_id(),
545 label: Some(bundle.policy.catchup.mode.clone()),
546 branch: Some("catchup".to_string()),
547 });
548 edges.push(WorkflowBundleGraphEdge {
549 from: catchup_graph_id(),
550 to: workflow_graph_id(&target),
551 label: Some("dispatch".to_string()),
552 branch: None,
553 });
554 } else {
555 edges.push(WorkflowBundleGraphEdge {
556 from: trigger_graph_id(&trigger.id),
557 to: workflow_graph_id(&target),
558 label: Some("dispatch".to_string()),
559 branch: None,
560 });
561 }
562 }
563
564 for (node_id, node) in &canonical.nodes {
565 let capsule_id = capsules_by_node.get(node_id);
566 let node_fields = workflow_node_editable_fields(node_id, capsule_id);
567 editable_fields.extend(node_fields.clone());
568 nodes.push(WorkflowBundleGraphNode {
569 id: workflow_graph_id(node_id),
570 node_type: workflow_node_type(&node.kind),
571 label: workflow_node_label(node_id, node),
572 workflow_node_id: Some(node_id.clone()),
573 trigger_id: None,
574 connector_id: None,
575 editable_fields: node_fields,
576 metadata: BTreeMap::from([
577 ("kind".to_string(), serde_json::json!(node.kind)),
578 ("task_label".to_string(), serde_json::json!(node.task_label)),
579 (
580 "prompt_capsule".to_string(),
581 serde_json::json!(capsule_id.cloned()),
582 ),
583 ]),
584 });
585 }
586
587 for edge in sorted_edges(&canonical) {
588 edges.push(WorkflowBundleGraphEdge {
589 from: workflow_graph_id(&edge.from),
590 to: workflow_graph_id(&edge.to),
591 label: edge.label.clone(),
592 branch: edge.branch.clone(),
593 });
594 }
595
596 let outgoing: BTreeSet<&str> = canonical
597 .edges
598 .iter()
599 .map(|edge| edge.from.as_str())
600 .collect();
601 for node_id in canonical.nodes.keys() {
602 if !outgoing.contains(node_id.as_str()) {
603 edges.push(WorkflowBundleGraphEdge {
604 from: workflow_graph_id(node_id),
605 to: terminal_completed_graph_id(),
606 label: Some("completed".to_string()),
607 branch: Some("completed".to_string()),
608 });
609 }
610 if retry_can_dlq {
611 edges.push(WorkflowBundleGraphEdge {
612 from: workflow_graph_id(node_id),
613 to: dlq_graph_id(),
614 label: Some("retry exhausted".to_string()),
615 branch: Some("failed".to_string()),
616 });
617 }
618 }
619
620 nodes.push(WorkflowBundleGraphNode {
621 id: terminal_completed_graph_id(),
622 node_type: "terminal".to_string(),
623 label: "Completed".to_string(),
624 workflow_node_id: None,
625 trigger_id: None,
626 connector_id: None,
627 editable_fields: Vec::new(),
628 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("completed"))]),
629 });
630 nodes.push(WorkflowBundleGraphNode {
631 id: terminal_failed_graph_id(),
632 node_type: "terminal".to_string(),
633 label: "Failed".to_string(),
634 workflow_node_id: None,
635 trigger_id: None,
636 connector_id: None,
637 editable_fields: Vec::new(),
638 metadata: BTreeMap::from([("status".to_string(), serde_json::json!("failed"))]),
639 });
640 if retry_can_dlq {
641 edges.push(WorkflowBundleGraphEdge {
642 from: dlq_graph_id(),
643 to: terminal_failed_graph_id(),
644 label: Some("failed".to_string()),
645 branch: Some("failed".to_string()),
646 });
647 }
648
649 nodes.sort_by(|left, right| left.id.cmp(&right.id));
650 edges.sort_by(|left, right| {
651 (&left.from, &left.to, &left.branch, &left.label).cmp(&(
652 &right.from,
653 &right.to,
654 &right.branch,
655 &right.label,
656 ))
657 });
658 editable_fields.sort_by(|left, right| left.id.cmp(&right.id));
659
660 let diagnostics = validation
661 .errors
662 .iter()
663 .chain(validation.warnings.iter())
664 .map(|diagnostic| WorkflowBundleGraphDiagnostic {
665 severity: diagnostic.severity.clone(),
666 path: diagnostic.path.clone(),
667 message: diagnostic.message.clone(),
668 node_id: diagnostic.node_id.clone(),
669 graph_node_id: diagnostic.node_id.as_deref().map(workflow_graph_id),
670 })
671 .collect::<Vec<_>>();
672 let mermaid = render_workflow_bundle_mermaid(&nodes, &edges);
673
674 WorkflowBundleGraphExport {
675 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
676 graph_id: canonical.id,
677 graph_digest: validation.graph_digest.clone(),
678 nodes,
679 edges,
680 diagnostics,
681 editable_fields,
682 mermaid,
683 }
684}
685
686pub fn run_workflow_bundle(
687 bundle: &WorkflowBundle,
688 request: WorkflowBundleRunRequest,
689) -> Result<WorkflowBundleRunReceipt, WorkflowBundleValidationReport> {
690 let validation = validate_workflow_bundle(bundle);
691 if !validation.valid {
692 return Err(validation);
693 }
694
695 let canonical = canonical_workflow_graph(&bundle.workflow);
696 let trigger_id = match request.trigger_id {
697 Some(trigger_id)
698 if !bundle
699 .triggers
700 .iter()
701 .any(|trigger| trigger.id == trigger_id) =>
702 {
703 let mut report = validation;
704 push_error(
705 &mut report,
706 "trigger_id",
707 format!("unknown trigger id: {trigger_id}"),
708 None,
709 );
710 report.valid = false;
711 return Err(report);
712 }
713 Some(trigger_id) => Some(trigger_id),
714 None => bundle.triggers.first().map(|trigger| trigger.id.clone()),
715 };
716 let mut event_ids = bundle.receipts.event_ids.clone();
717 if let Some(event_id) = request.event_id {
718 if !event_ids.contains(&event_id) {
719 event_ids.push(event_id);
720 }
721 }
722 let run_id = bundle
723 .receipts
724 .run_id
725 .clone()
726 .unwrap_or_else(|| default_run_id(bundle, &validation.graph_digest));
727 let capsules_by_node = capsules_by_node(bundle);
728 let executed_nodes = execution_order(&canonical)
729 .into_iter()
730 .map(|node_id| {
731 let node = canonical
732 .nodes
733 .get(&node_id)
734 .expect("execution order only contains known nodes");
735 WorkflowBundleRunNodeReceipt {
736 node_id: node_id.clone(),
737 kind: node.kind.clone(),
738 prompt_capsule: capsules_by_node.get(&node_id).cloned(),
739 status: "completed".to_string(),
740 }
741 })
742 .collect();
743
744 Ok(WorkflowBundleRunReceipt {
745 schema_version: WORKFLOW_BUNDLE_SCHEMA_VERSION,
746 receipt_type: WORKFLOW_BUNDLE_RECEIPT_TYPE.to_string(),
747 bundle_id: bundle.id.clone(),
748 bundle_version: bundle.version.clone(),
749 workflow_id: canonical.id,
750 workflow_version: canonical.version,
751 graph_digest: validation.graph_digest,
752 run_id,
753 trigger_id,
754 event_ids,
755 status: "completed".to_string(),
756 executed_nodes,
757 policy: bundle.policy.clone(),
758 connectors: bundle.connectors.clone(),
759 environment: bundle.environment.clone(),
760 })
761}
762
763fn canonical_workflow_graph(graph: &WorkflowGraph) -> WorkflowGraph {
764 let mut canonical = graph.clone();
765 if canonical.type_name.is_empty() {
766 canonical.type_name = "workflow_graph".to_string();
767 }
768 if canonical.version == 0 {
769 canonical.version = 1;
770 }
771 if canonical.entry.is_empty() {
772 canonical.entry = canonical.nodes.keys().next().cloned().unwrap_or_default();
773 }
774 for (node_id, node) in &mut canonical.nodes {
775 if node.id.is_none() {
776 node.id = Some(node_id.clone());
777 }
778 if node.kind.is_empty() {
779 node.kind = "stage".to_string();
780 }
781 if node.retry_policy.max_attempts == 0 {
782 node.retry_policy.max_attempts = 1;
783 }
784 }
785 canonical.edges = sorted_edges(&canonical);
786 canonical
787}
788
789fn sorted_edges(graph: &WorkflowGraph) -> Vec<WorkflowEdge> {
790 let mut edges = graph.edges.clone();
791 edges.sort_by(|left, right| {
792 (
793 &left.from,
794 &left.to,
795 left.branch.as_deref(),
796 left.label.as_deref(),
797 )
798 .cmp(&(
799 &right.from,
800 &right.to,
801 right.branch.as_deref(),
802 right.label.as_deref(),
803 ))
804 });
805 edges
806}
807
808fn validate_bundle_identity(
809 bundle: &WorkflowBundle,
810 graph: &WorkflowGraph,
811 report: &mut WorkflowBundleValidationReport,
812) {
813 if bundle.schema_version != WORKFLOW_BUNDLE_SCHEMA_VERSION {
814 push_error(
815 report,
816 "schema_version",
817 format!(
818 "unsupported schema_version {}; expected {}",
819 bundle.schema_version, WORKFLOW_BUNDLE_SCHEMA_VERSION
820 ),
821 None,
822 );
823 }
824 if bundle.id.trim().is_empty() {
825 push_error(report, "id", "bundle id is required", None);
826 }
827 if bundle.version.trim().is_empty() {
828 push_error(report, "version", "bundle version is required", None);
829 }
830 if graph.id.trim().is_empty() {
831 push_error(
832 report,
833 "workflow.id",
834 "workflow id is required for portable bundles",
835 None,
836 );
837 }
838 if graph.nodes.is_empty() {
839 push_error(
840 report,
841 "workflow.nodes",
842 "workflow must contain nodes",
843 None,
844 );
845 }
846 for (node_id, node) in &graph.nodes {
847 if node_id.trim().is_empty() {
848 push_error(report, "workflow.nodes", "node id is required", None);
849 }
850 if node.id.as_deref().is_some_and(|id| id != node_id) {
851 push_error(
852 report,
853 format!("workflow.nodes.{node_id}.id"),
854 "node id field must match its map key",
855 Some(node_id.clone()),
856 );
857 }
858 }
859}
860
861fn validate_triggers(
862 bundle: &WorkflowBundle,
863 graph: &WorkflowGraph,
864 report: &mut WorkflowBundleValidationReport,
865) {
866 if bundle.triggers.is_empty() {
867 push_warning(report, "triggers", "bundle declares no triggers", None);
868 }
869 let mut ids = BTreeSet::new();
870 for (index, trigger) in bundle.triggers.iter().enumerate() {
871 let path = format!("triggers[{index}]");
872 if trigger.id.trim().is_empty() {
873 push_error(report, format!("{path}.id"), "trigger id is required", None);
874 } else if !ids.insert(trigger.id.clone()) {
875 push_error(
876 report,
877 format!("{path}.id"),
878 format!("duplicate trigger id: {}", trigger.id),
879 None,
880 );
881 }
882 match trigger.kind.as_str() {
883 "github" => {
884 if trigger.provider.as_deref() != Some("github") {
885 push_error(
886 report,
887 format!("{path}.provider"),
888 "github triggers require provider=\"github\"",
889 None,
890 );
891 }
892 if trigger.events.is_empty() {
893 push_error(
894 report,
895 format!("{path}.events"),
896 "github triggers require at least one event",
897 None,
898 );
899 }
900 }
901 "cron" if trigger.schedule.is_none() => push_error(
902 report,
903 format!("{path}.schedule"),
904 "cron triggers require schedule",
905 None,
906 ),
907 "delay" if trigger.delay.is_none() => push_error(
908 report,
909 format!("{path}.delay"),
910 "delay triggers require delay",
911 None,
912 ),
913 "webhook" if trigger.webhook_path.is_none() => push_error(
914 report,
915 format!("{path}.webhook_path"),
916 "webhook triggers require webhook_path",
917 None,
918 ),
919 "mcp" if trigger.mcp_tool.is_none() => push_error(
920 report,
921 format!("{path}.mcp_tool"),
922 "mcp triggers require mcp_tool",
923 None,
924 ),
925 "manual" => {}
926 "" => push_error(
927 report,
928 format!("{path}.kind"),
929 "trigger kind is required",
930 None,
931 ),
932 other
933 if !matches!(
934 other,
935 "github" | "cron" | "delay" | "webhook" | "mcp" | "manual"
936 ) =>
937 {
938 push_error(
939 report,
940 format!("{path}.kind"),
941 format!("unsupported trigger kind: {other}"),
942 None,
943 );
944 }
945 _ => {}
946 }
947 if let Some(node_id) = trigger.node_id.as_deref() {
948 if !graph.nodes.contains_key(node_id) {
949 push_error(
950 report,
951 format!("{path}.node_id"),
952 format!("trigger references unknown node: {node_id}"),
953 Some(node_id.to_string()),
954 );
955 }
956 }
957 }
958}
959
960fn validate_prompt_capsules(
961 bundle: &WorkflowBundle,
962 graph: &WorkflowGraph,
963 report: &mut WorkflowBundleValidationReport,
964) {
965 let trigger_ids: BTreeSet<&str> = bundle
966 .triggers
967 .iter()
968 .map(|trigger| trigger.id.as_str())
969 .collect();
970 let mut node_refs = BTreeSet::new();
971 for (key, capsule) in &bundle.prompt_capsules {
972 let path = format!("prompt_capsules.{key}");
973 if capsule.id.trim().is_empty() {
974 push_error(
975 report,
976 format!("{path}.id"),
977 "prompt capsule id is required",
978 None,
979 );
980 } else if capsule.id != *key {
981 push_error(
982 report,
983 format!("{path}.id"),
984 "prompt capsule id must match its map key",
985 None,
986 );
987 }
988 if capsule.prompt.trim().is_empty() {
989 push_error(
990 report,
991 format!("{path}.prompt"),
992 "prompt capsule prompt is required",
993 Some(capsule.node_id.clone()),
994 );
995 }
996 if !graph.nodes.contains_key(&capsule.node_id) {
997 push_error(
998 report,
999 format!("{path}.node_id"),
1000 format!(
1001 "prompt capsule references unknown node: {}",
1002 capsule.node_id
1003 ),
1004 Some(capsule.node_id.clone()),
1005 );
1006 }
1007 if !capsule.node_id.is_empty() && !node_refs.insert(capsule.node_id.clone()) {
1008 push_error(
1009 report,
1010 format!("{path}.node_id"),
1011 format!("multiple prompt capsules target node {}", capsule.node_id),
1012 Some(capsule.node_id.clone()),
1013 );
1014 }
1015 if let Some(trigger_id) = capsule.trigger_id.as_deref() {
1016 if !trigger_ids.contains(trigger_id) {
1017 push_error(
1018 report,
1019 format!("{path}.trigger_id"),
1020 format!("prompt capsule references unknown trigger: {trigger_id}"),
1021 Some(capsule.node_id.clone()),
1022 );
1023 }
1024 }
1025 }
1026}
1027
1028fn validate_policy(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1029 if !matches!(
1030 bundle.policy.autonomy_tier.as_str(),
1031 "shadow" | "suggest" | "act_with_approval" | "act_auto"
1032 ) {
1033 push_error(
1034 report,
1035 "policy.autonomy_tier",
1036 "autonomy_tier must be shadow, suggest, act_with_approval, or act_auto",
1037 None,
1038 );
1039 }
1040 if bundle.policy.retry.max_attempts == 0 {
1041 push_error(
1042 report,
1043 "policy.retry.max_attempts",
1044 "retry.max_attempts must be at least 1",
1045 None,
1046 );
1047 }
1048 if !matches!(
1049 bundle.policy.catchup.mode.as_str(),
1050 "none" | "latest" | "all"
1051 ) {
1052 push_error(
1053 report,
1054 "policy.catchup.mode",
1055 "catchup.mode must be none, latest, or all",
1056 None,
1057 );
1058 }
1059}
1060
1061fn validate_connectors(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1062 let mut ids = BTreeSet::new();
1063 let provider_ids: BTreeSet<&str> = bundle
1064 .connectors
1065 .iter()
1066 .map(|connector| connector.provider_id.as_str())
1067 .collect();
1068 for (index, connector) in bundle.connectors.iter().enumerate() {
1069 let path = format!("connectors[{index}]");
1070 if connector.id.trim().is_empty() {
1071 push_error(
1072 report,
1073 format!("{path}.id"),
1074 "connector id is required",
1075 None,
1076 );
1077 } else if !ids.insert(connector.id.clone()) {
1078 push_error(
1079 report,
1080 format!("{path}.id"),
1081 format!("duplicate connector id: {}", connector.id),
1082 None,
1083 );
1084 }
1085 if connector.provider_id.trim().is_empty() {
1086 push_error(
1087 report,
1088 format!("{path}.provider_id"),
1089 "connector provider_id is required",
1090 None,
1091 );
1092 }
1093 }
1094 for trigger in &bundle.triggers {
1095 if let Some(provider) = trigger.provider.as_deref() {
1096 if !provider_ids.contains(provider) {
1097 push_warning(
1098 report,
1099 "connectors",
1100 format!(
1101 "trigger {} references provider {provider} with no connector requirement",
1102 trigger.id
1103 ),
1104 trigger.node_id.clone(),
1105 );
1106 }
1107 }
1108 }
1109}
1110
1111fn validate_environment(bundle: &WorkflowBundle, report: &mut WorkflowBundleValidationReport) {
1112 if !matches!(
1113 bundle.environment.worktree_policy.as_str(),
1114 "reuse_current" | "new_worktree" | "host_managed"
1115 ) {
1116 push_error(
1117 report,
1118 "environment.worktree_policy",
1119 "worktree_policy must be reuse_current, new_worktree, or host_managed",
1120 None,
1121 );
1122 }
1123}
1124
1125fn workflow_diagnostic_node_id(message: &str, graph: &WorkflowGraph) -> Option<String> {
1126 for prefix in [
1127 "node is unreachable: ",
1128 "edge.from references unknown node: ",
1129 "edge.to references unknown node: ",
1130 "entry node does not exist: ",
1131 ] {
1132 if let Some(node_id) = message.strip_prefix(prefix) {
1133 return Some(node_id.to_string());
1134 }
1135 }
1136 if let Some(rest) = message.strip_prefix("node ") {
1137 if let Some((node_id, _)) = rest.split_once(':') {
1138 return Some(node_id.to_string());
1139 }
1140 }
1141 graph
1142 .nodes
1143 .keys()
1144 .find(|node_id| message.contains(&format!("node {node_id}:")))
1145 .cloned()
1146}
1147
1148fn workflow_graph_id(node_id: &str) -> String {
1149 format!("node/{node_id}")
1150}
1151
1152fn trigger_graph_id(trigger_id: &str) -> String {
1153 format!("trigger/{trigger_id}")
1154}
1155
1156fn connector_graph_id(connector_id: &str) -> String {
1157 format!("connector/{connector_id}")
1158}
1159
1160fn catchup_graph_id() -> String {
1161 "policy/catchup".to_string()
1162}
1163
1164fn dlq_graph_id() -> String {
1165 "policy/dlq".to_string()
1166}
1167
1168fn terminal_completed_graph_id() -> String {
1169 "terminal/completed".to_string()
1170}
1171
1172fn terminal_failed_graph_id() -> String {
1173 "terminal/failed".to_string()
1174}
1175
1176fn workflow_node_type(kind: &str) -> String {
1177 match kind {
1178 "action" => "action",
1179 "stage" | "agent" => "agent",
1180 "subagent" | "worker" => "subagent",
1181 "wait" | "waitpoint" | "delay" => "wait",
1182 "approval" | "hitl" => "approval",
1183 "connector" | "connector_call" => "connector_call",
1184 "notification" | "notify" => "notification",
1185 "terminal" | "success" | "failure" => "terminal",
1186 other if other.trim().is_empty() => "agent",
1187 other => other,
1188 }
1189 .to_string()
1190}
1191
1192fn workflow_node_label(node_id: &str, node: &super::WorkflowNode) -> String {
1193 node.task_label
1194 .clone()
1195 .or_else(|| node.prompt.clone())
1196 .map(|label| label.trim().to_string())
1197 .filter(|label| !label.is_empty())
1198 .unwrap_or_else(|| node_id.to_string())
1199}
1200
1201fn trigger_label(trigger: &WorkflowBundleTrigger) -> String {
1202 if !trigger.events.is_empty() {
1203 format!("{}: {}", trigger.kind, trigger.events.join(", "))
1204 } else if let Some(schedule) = trigger.schedule.as_deref() {
1205 format!("cron: {schedule}")
1206 } else if let Some(delay) = trigger.delay.as_deref() {
1207 format!("delay: {delay}")
1208 } else {
1209 trigger.id.clone()
1210 }
1211}
1212
1213fn connector_label(connector: &ConnectorRequirement) -> String {
1214 if connector.provider_id.is_empty() || connector.provider_id == connector.id {
1215 connector.id.clone()
1216 } else {
1217 format!("{} ({})", connector.id, connector.provider_id)
1218 }
1219}
1220
1221fn editable_field(
1222 id: impl Into<String>,
1223 label: impl Into<String>,
1224 json_pointer: impl Into<String>,
1225 value_type: impl Into<String>,
1226 required: bool,
1227 enum_values: &[&str],
1228) -> WorkflowBundleEditableField {
1229 WorkflowBundleEditableField {
1230 id: id.into(),
1231 label: label.into(),
1232 json_pointer: json_pointer.into(),
1233 value_type: value_type.into(),
1234 required,
1235 enum_values: enum_values
1236 .iter()
1237 .map(|value| (*value).to_string())
1238 .collect(),
1239 }
1240}
1241
1242fn json_pointer_segment(value: &str) -> String {
1243 value.replace('~', "~0").replace('/', "~1")
1244}
1245
1246fn trigger_editable_fields(
1247 index: usize,
1248 trigger: &WorkflowBundleTrigger,
1249) -> Vec<WorkflowBundleEditableField> {
1250 let base = format!("/triggers/{index}");
1251 let mut fields = vec![
1252 editable_field(
1253 format!("trigger.{}.kind", trigger.id),
1254 "Trigger kind",
1255 format!("{base}/kind"),
1256 "enum",
1257 true,
1258 &["github", "cron", "delay", "manual", "webhook", "mcp"],
1259 ),
1260 editable_field(
1261 format!("trigger.{}.node_id", trigger.id),
1262 "Target node",
1263 format!("{base}/node_id"),
1264 "string",
1265 false,
1266 &[],
1267 ),
1268 ];
1269 if trigger.provider.is_some() || trigger.kind == "github" {
1270 fields.push(editable_field(
1271 format!("trigger.{}.provider", trigger.id),
1272 "Provider",
1273 format!("{base}/provider"),
1274 "string",
1275 trigger.kind == "github",
1276 &[],
1277 ));
1278 }
1279 for (field, label, value_type) in [
1280 ("events", "Events", "list"),
1281 ("schedule", "Schedule", "string"),
1282 ("delay", "Delay", "string"),
1283 ("webhook_path", "Webhook path", "string"),
1284 ("mcp_tool", "MCP tool", "string"),
1285 ("resume_key", "Resume key", "string"),
1286 ("metadata", "Metadata", "object"),
1287 ] {
1288 fields.push(editable_field(
1289 format!("trigger.{}.{}", trigger.id, field),
1290 label,
1291 format!("{base}/{field}"),
1292 value_type,
1293 false,
1294 &[],
1295 ));
1296 }
1297 fields
1298}
1299
1300fn workflow_node_editable_fields(
1301 node_id: &str,
1302 capsule_id: Option<&String>,
1303) -> Vec<WorkflowBundleEditableField> {
1304 let escaped_node = json_pointer_segment(node_id);
1305 let mut fields = vec![
1306 editable_field(
1307 format!("workflow.{node_id}.task_label"),
1308 "Task label",
1309 format!("/workflow/nodes/{escaped_node}/task_label"),
1310 "string",
1311 false,
1312 &[],
1313 ),
1314 editable_field(
1315 format!("workflow.{node_id}.prompt"),
1316 "Prompt",
1317 format!("/workflow/nodes/{escaped_node}/prompt"),
1318 "string",
1319 false,
1320 &[],
1321 ),
1322 editable_field(
1323 format!("workflow.{node_id}.system"),
1324 "System prompt",
1325 format!("/workflow/nodes/{escaped_node}/system"),
1326 "string",
1327 false,
1328 &[],
1329 ),
1330 editable_field(
1331 format!("workflow.{node_id}.model_policy"),
1332 "Model policy",
1333 format!("/workflow/nodes/{escaped_node}/model_policy"),
1334 "object",
1335 false,
1336 &[],
1337 ),
1338 editable_field(
1339 format!("workflow.{node_id}.tools"),
1340 "Tool policy",
1341 format!("/workflow/nodes/{escaped_node}/tools"),
1342 "any",
1343 false,
1344 &[],
1345 ),
1346 editable_field(
1347 format!("workflow.{node_id}.capability_policy"),
1348 "Capability policy",
1349 format!("/workflow/nodes/{escaped_node}/capability_policy"),
1350 "object",
1351 false,
1352 &[],
1353 ),
1354 editable_field(
1355 format!("workflow.{node_id}.approval_policy"),
1356 "Approval policy",
1357 format!("/workflow/nodes/{escaped_node}/approval_policy"),
1358 "object",
1359 false,
1360 &[],
1361 ),
1362 editable_field(
1363 format!("workflow.{node_id}.retry_policy"),
1364 "Retry policy",
1365 format!("/workflow/nodes/{escaped_node}/retry_policy"),
1366 "object",
1367 false,
1368 &[],
1369 ),
1370 ];
1371 if let Some(capsule_id) = capsule_id {
1372 let escaped_capsule = json_pointer_segment(capsule_id);
1373 fields.extend([
1374 editable_field(
1375 format!("prompt_capsule.{capsule_id}.prompt"),
1376 "Prompt capsule",
1377 format!("/prompt_capsules/{escaped_capsule}/prompt"),
1378 "string",
1379 true,
1380 &[],
1381 ),
1382 editable_field(
1383 format!("prompt_capsule.{capsule_id}.system"),
1384 "Prompt capsule system",
1385 format!("/prompt_capsules/{escaped_capsule}/system"),
1386 "string",
1387 false,
1388 &[],
1389 ),
1390 editable_field(
1391 format!("prompt_capsule.{capsule_id}.context"),
1392 "Prompt capsule context",
1393 format!("/prompt_capsules/{escaped_capsule}/context"),
1394 "object",
1395 false,
1396 &[],
1397 ),
1398 editable_field(
1399 format!("prompt_capsule.{capsule_id}.trigger_id"),
1400 "Prompt capsule trigger",
1401 format!("/prompt_capsules/{escaped_capsule}/trigger_id"),
1402 "string",
1403 false,
1404 &[],
1405 ),
1406 ]);
1407 }
1408 fields
1409}
1410
1411fn connector_editable_fields(
1412 index: usize,
1413 connector: &ConnectorRequirement,
1414) -> Vec<WorkflowBundleEditableField> {
1415 let base = format!("/connectors/{index}");
1416 [
1417 ("id", "Connector id", "string", true),
1418 ("provider_id", "Provider id", "string", true),
1419 ("scopes", "Scopes", "list", false),
1420 ("setup_required", "Setup required", "bool", false),
1421 ("status_required", "Status required", "bool", false),
1422 ]
1423 .into_iter()
1424 .map(|(field, label, value_type, required)| {
1425 editable_field(
1426 format!("connector.{}.{}", connector.id, field),
1427 label,
1428 format!("{base}/{field}"),
1429 value_type,
1430 required,
1431 &[],
1432 )
1433 })
1434 .collect()
1435}
1436
1437fn retry_editable_fields() -> Vec<WorkflowBundleEditableField> {
1438 vec![
1439 editable_field(
1440 "policy.retry.max_attempts",
1441 "Retry attempts",
1442 "/policy/retry/max_attempts",
1443 "integer",
1444 true,
1445 &[],
1446 ),
1447 editable_field(
1448 "policy.retry.backoff",
1449 "Retry backoff",
1450 "/policy/retry/backoff",
1451 "string",
1452 true,
1453 &[],
1454 ),
1455 ]
1456}
1457
1458fn catchup_editable_fields() -> Vec<WorkflowBundleEditableField> {
1459 vec![
1460 editable_field(
1461 "policy.catchup.mode",
1462 "Catchup mode",
1463 "/policy/catchup/mode",
1464 "enum",
1465 true,
1466 &["none", "latest", "all"],
1467 ),
1468 editable_field(
1469 "policy.catchup.max_events",
1470 "Catchup max events",
1471 "/policy/catchup/max_events",
1472 "integer",
1473 false,
1474 &[],
1475 ),
1476 ]
1477}
1478
1479fn render_workflow_bundle_mermaid(
1480 nodes: &[WorkflowBundleGraphNode],
1481 edges: &[WorkflowBundleGraphEdge],
1482) -> String {
1483 let mut lines = vec!["flowchart TD".to_string()];
1484 for node in nodes {
1485 lines.push(format!(
1486 " {}[\"{}\"]",
1487 mermaid_id(&node.id),
1488 mermaid_label(&format!("{}: {}", node.node_type, node.label))
1489 ));
1490 }
1491 for edge in edges {
1492 let label = edge
1493 .label
1494 .as_deref()
1495 .or(edge.branch.as_deref())
1496 .map(mermaid_label);
1497 match label {
1498 Some(label) if !label.is_empty() => lines.push(format!(
1499 " {} -->|{}| {}",
1500 mermaid_id(&edge.from),
1501 label,
1502 mermaid_id(&edge.to)
1503 )),
1504 _ => lines.push(format!(
1505 " {} --> {}",
1506 mermaid_id(&edge.from),
1507 mermaid_id(&edge.to)
1508 )),
1509 }
1510 }
1511 lines.join("\n")
1512}
1513
1514fn mermaid_id(value: &str) -> String {
1515 let digest = Sha256::digest(value.as_bytes());
1516 let suffix = digest
1517 .iter()
1518 .take(4)
1519 .map(|byte| format!("{byte:02x}"))
1520 .collect::<String>();
1521 let mut out = format!("n_{suffix}_");
1522 for ch in value.chars() {
1523 if ch.is_ascii_alphanumeric() {
1524 out.push(ch);
1525 } else {
1526 out.push('_');
1527 }
1528 }
1529 out
1530}
1531
1532fn mermaid_label(value: &str) -> String {
1533 value
1534 .replace('\\', "\\\\")
1535 .replace('"', "\\\"")
1536 .replace('\n', " ")
1537}
1538
1539fn triggers_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, Vec<String>> {
1540 let mut by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
1541 for trigger in &bundle.triggers {
1542 if let Some(node_id) = trigger.node_id.as_ref() {
1543 by_node
1544 .entry(node_id.clone())
1545 .or_default()
1546 .push(trigger.id.clone());
1547 }
1548 }
1549 by_node
1550}
1551
1552fn capsules_by_node(bundle: &WorkflowBundle) -> BTreeMap<String, String> {
1553 bundle
1554 .prompt_capsules
1555 .iter()
1556 .map(|(id, capsule)| (capsule.node_id.clone(), id.clone()))
1557 .collect()
1558}
1559
1560fn execution_order(graph: &WorkflowGraph) -> Vec<String> {
1561 let outgoing =
1562 graph
1563 .edges
1564 .iter()
1565 .fold(BTreeMap::<String, Vec<String>>::new(), |mut acc, edge| {
1566 acc.entry(edge.from.clone())
1567 .or_default()
1568 .push(edge.to.clone());
1569 acc
1570 });
1571 let mut seen = BTreeSet::new();
1572 let mut queue = VecDeque::from([graph.entry.clone()]);
1573 let mut order = Vec::new();
1574 while let Some(node_id) = queue.pop_front() {
1575 if !graph.nodes.contains_key(&node_id) || !seen.insert(node_id.clone()) {
1576 continue;
1577 }
1578 order.push(node_id.clone());
1579 if let Some(next) = outgoing.get(&node_id) {
1580 let mut next = next.clone();
1581 next.sort();
1582 for child in next {
1583 queue.push_back(child);
1584 }
1585 }
1586 }
1587 order
1588}
1589
1590fn default_run_id(bundle: &WorkflowBundle, graph_digest: &str) -> String {
1591 let suffix = graph_digest
1592 .strip_prefix("sha256:")
1593 .unwrap_or(graph_digest)
1594 .chars()
1595 .take(12)
1596 .collect::<String>();
1597 format!("bundle_run_{}_{}", sanitize_id(&bundle.id), suffix)
1598}
1599
1600fn sanitize_id(value: &str) -> String {
1601 value
1602 .chars()
1603 .map(|ch| {
1604 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
1605 ch
1606 } else {
1607 '_'
1608 }
1609 })
1610 .collect()
1611}
1612
1613fn push_error(
1614 report: &mut WorkflowBundleValidationReport,
1615 path: impl Into<String>,
1616 message: impl Into<String>,
1617 node_id: Option<String>,
1618) {
1619 report.errors.push(WorkflowBundleDiagnostic {
1620 severity: "error".to_string(),
1621 path: path.into(),
1622 message: message.into(),
1623 node_id,
1624 });
1625}
1626
1627fn push_warning(
1628 report: &mut WorkflowBundleValidationReport,
1629 path: impl Into<String>,
1630 message: impl Into<String>,
1631 node_id: Option<String>,
1632) {
1633 report.warnings.push(WorkflowBundleDiagnostic {
1634 severity: "warning".to_string(),
1635 path: path.into(),
1636 message: message.into(),
1637 node_id,
1638 });
1639}
1640
1641#[cfg(test)]
1642#[path = "workflow_bundle_tests.rs"]
1643mod workflow_bundle_tests;