1use std::collections::{BTreeMap, BTreeSet};
4
5use serde::{Deserialize, Serialize};
6
7use super::{
8 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
9 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
10 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
11};
12use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
13use crate::tool_surface::{tool_capability_policy_from_spec, tool_names_from_spec};
14use crate::value::{VmError, VmValue};
15
16pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
17pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
18
19#[derive(Clone, Debug, Default, Serialize, Deserialize)]
20#[serde(default)]
21pub struct WorkflowNode {
22 pub id: Option<String>,
23 pub kind: String,
24 pub mode: Option<String>,
25 pub prompt: Option<String>,
26 pub system: Option<String>,
27 pub task_label: Option<String>,
28 pub done_sentinel: Option<String>,
29 pub tools: serde_json::Value,
30 pub model_policy: ModelPolicy,
31 pub auto_compact: AutoCompactPolicy,
36 #[serde(default)]
41 pub output_visibility: Option<String>,
42 pub context_policy: ContextPolicy,
43 pub retry_policy: RetryPolicy,
44 pub capability_policy: CapabilityPolicy,
45 pub approval_policy: super::ToolApprovalPolicy,
46 pub input_contract: StageContract,
47 pub output_contract: StageContract,
48 pub branch_semantics: BranchSemantics,
49 pub map_policy: MapPolicy,
50 pub join_policy: JoinPolicy,
51 pub reduce_policy: ReducePolicy,
52 pub escalation_policy: EscalationPolicy,
53 pub verify: Option<serde_json::Value>,
54 #[serde(default)]
59 pub exit_when_verified: bool,
60 pub metadata: BTreeMap<String, serde_json::Value>,
61 #[serde(skip)]
62 pub raw_tools: Option<VmValue>,
63 #[serde(skip)]
67 pub raw_auto_compact: Option<VmValue>,
68 #[serde(skip)]
71 pub raw_model_policy: Option<VmValue>,
72 #[serde(skip)]
77 pub raw_context_assembler: Option<VmValue>,
78}
79
80impl PartialEq for WorkflowNode {
81 fn eq(&self, other: &Self) -> bool {
82 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
83 }
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct VerificationRequirement {
89 pub kind: String,
90 pub value: String,
91 pub note: Option<String>,
92}
93
94#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
95#[serde(default)]
96pub struct VerificationContract {
97 pub source_node: Option<String>,
98 pub summary: Option<String>,
99 pub command: Option<String>,
100 pub expect_status: Option<i64>,
101 pub assert_text: Option<String>,
102 pub expect_text: Option<String>,
103 pub required_identifiers: Vec<String>,
104 pub required_paths: Vec<String>,
105 pub required_text: Vec<String>,
106 pub notes: Vec<String>,
107 pub checks: Vec<VerificationRequirement>,
108}
109
110impl VerificationContract {
111 fn is_empty(&self) -> bool {
112 self.summary.is_none()
113 && self.command.is_none()
114 && self.expect_status.is_none()
115 && self.assert_text.is_none()
116 && self.expect_text.is_none()
117 && self.required_identifiers.is_empty()
118 && self.required_paths.is_empty()
119 && self.required_text.is_empty()
120 && self.notes.is_empty()
121 && self.checks.is_empty()
122 }
123}
124
125fn push_unique_string(values: &mut Vec<String>, value: &str) {
126 let trimmed = value.trim();
127 if trimmed.is_empty() {
128 return;
129 }
130 if !values.iter().any(|existing| existing == trimmed) {
131 values.push(trimmed.to_string());
132 }
133}
134
135fn push_unique_requirement(
136 values: &mut Vec<VerificationRequirement>,
137 kind: &str,
138 value: &str,
139 note: Option<&str>,
140) {
141 let trimmed_kind = kind.trim();
142 let trimmed_value = value.trim();
143 let trimmed_note = note
144 .map(str::trim)
145 .filter(|candidate| !candidate.is_empty())
146 .map(|candidate| candidate.to_string());
147 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
148 return;
149 }
150 let candidate = VerificationRequirement {
151 kind: trimmed_kind.to_string(),
152 value: trimmed_value.to_string(),
153 note: trimmed_note,
154 };
155 if !values.iter().any(|existing| existing == &candidate) {
156 values.push(candidate);
157 }
158}
159
160fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
161 match value {
162 Some(serde_json::Value::String(text)) => {
163 let mut values = Vec::new();
164 push_unique_string(&mut values, text);
165 values
166 }
167 Some(serde_json::Value::Array(items)) => {
168 let mut values = Vec::new();
169 for item in items {
170 if let Some(text) = item.as_str() {
171 push_unique_string(&mut values, text);
172 }
173 }
174 values
175 }
176 _ => Vec::new(),
177 }
178}
179
180fn merge_verification_requirement_list(
181 target: &mut Vec<VerificationRequirement>,
182 value: Option<&serde_json::Value>,
183) {
184 let Some(items) = value.and_then(|raw| raw.as_array()) else {
185 return;
186 };
187 for item in items {
188 let Some(object) = item.as_object() else {
189 continue;
190 };
191 let kind = object
192 .get("kind")
193 .and_then(|value| value.as_str())
194 .unwrap_or_default();
195 let value = object
196 .get("value")
197 .and_then(|value| value.as_str())
198 .unwrap_or_default();
199 let note = object
200 .get("note")
201 .or_else(|| object.get("description"))
202 .or_else(|| object.get("reason"))
203 .and_then(|value| value.as_str());
204 push_unique_requirement(target, kind, value, note);
205 }
206}
207
208fn merge_verification_contract_fields(
209 target: &mut VerificationContract,
210 object: &serde_json::Map<String, serde_json::Value>,
211) {
212 if target.summary.is_none() {
213 target.summary = object
214 .get("summary")
215 .and_then(|value| value.as_str())
216 .map(str::trim)
217 .filter(|value| !value.is_empty())
218 .map(|value| value.to_string());
219 }
220 if target.command.is_none() {
221 target.command = object
222 .get("command")
223 .and_then(|value| value.as_str())
224 .map(str::trim)
225 .filter(|value| !value.is_empty())
226 .map(|value| value.to_string());
227 }
228 if target.expect_status.is_none() {
229 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
230 }
231 if target.assert_text.is_none() {
232 target.assert_text = object
233 .get("assert_text")
234 .and_then(|value| value.as_str())
235 .map(str::trim)
236 .filter(|value| !value.is_empty())
237 .map(|value| value.to_string());
238 }
239 if target.expect_text.is_none() {
240 target.expect_text = object
241 .get("expect_text")
242 .and_then(|value| value.as_str())
243 .map(str::trim)
244 .filter(|value| !value.is_empty())
245 .map(|value| value.to_string());
246 }
247
248 for value in json_string_list(
249 object
250 .get("required_identifiers")
251 .or_else(|| object.get("identifiers")),
252 ) {
253 push_unique_string(&mut target.required_identifiers, &value);
254 }
255 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
256 push_unique_string(&mut target.required_paths, &value);
257 }
258 for value in json_string_list(
259 object
260 .get("required_text")
261 .or_else(|| object.get("exact_text"))
262 .or_else(|| object.get("required_strings")),
263 ) {
264 push_unique_string(&mut target.required_text, &value);
265 }
266 for value in json_string_list(object.get("notes")) {
267 push_unique_string(&mut target.notes, &value);
268 }
269 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
270}
271
272fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
273 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
274 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
275 VmError::Runtime(format!(
276 "workflow verification contract read failed for {}: {error}",
277 resolved.display()
278 ))
279 })?;
280 serde_json::from_str(&contents).map_err(|error| {
281 VmError::Runtime(format!(
282 "workflow verification contract parse failed for {}: {error}",
283 resolved.display()
284 ))
285 })
286}
287
288fn resolve_verification_contract_path(
289 verify: &serde_json::Map<String, serde_json::Value>,
290) -> Result<Option<serde_json::Value>, VmError> {
291 let Some(path) = verify
292 .get("contract_path")
293 .or_else(|| verify.get("verification_contract_path"))
294 .and_then(|value| value.as_str())
295 .map(str::trim)
296 .filter(|value| !value.is_empty())
297 else {
298 return Ok(None);
299 };
300 Ok(Some(load_verification_contract_file(path)?))
301}
302
303pub fn verification_contract_from_verify(
304 node_id: &str,
305 verify: Option<&serde_json::Value>,
306) -> Result<Option<VerificationContract>, VmError> {
307 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
308 return Ok(None);
309 };
310
311 let mut contract = VerificationContract {
312 source_node: Some(node_id.to_string()),
313 ..Default::default()
314 };
315
316 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
317 let Some(object) = file_contract.as_object() else {
318 return Err(VmError::Runtime(
319 "workflow verification contract file must parse to a JSON object".to_string(),
320 ));
321 };
322 merge_verification_contract_fields(&mut contract, object);
323 }
324
325 if let Some(inline_contract) = verify_object.get("contract") {
326 let Some(object) = inline_contract.as_object() else {
327 return Err(VmError::Runtime(
328 "workflow verify.contract must be an object".to_string(),
329 ));
330 };
331 merge_verification_contract_fields(&mut contract, object);
332 }
333
334 merge_verification_contract_fields(&mut contract, verify_object);
335
336 if let Some(assert_text) = contract.assert_text.clone() {
337 push_unique_requirement(
338 &mut contract.checks,
339 "visible_text_contains",
340 &assert_text,
341 Some("verify stage requires visible output to contain this text"),
342 );
343 }
344 if let Some(expect_text) = contract.expect_text.clone() {
345 push_unique_requirement(
346 &mut contract.checks,
347 "combined_output_contains",
348 &expect_text,
349 Some("verify command requires combined stdout/stderr to contain this text"),
350 );
351 }
352 if let Some(expect_status) = contract.expect_status {
353 push_unique_requirement(
354 &mut contract.checks,
355 "expect_status",
356 &expect_status.to_string(),
357 Some("verify command exit status must match exactly"),
358 );
359 }
360 for identifier in contract.required_identifiers.clone() {
361 push_unique_requirement(
362 &mut contract.checks,
363 "identifier",
364 &identifier,
365 Some("use this exact identifier spelling"),
366 );
367 }
368 for path in contract.required_paths.clone() {
369 push_unique_requirement(
370 &mut contract.checks,
371 "path",
372 &path,
373 Some("preserve this exact path"),
374 );
375 }
376 for text in contract.required_text.clone() {
377 push_unique_requirement(
378 &mut contract.checks,
379 "text",
380 &text,
381 Some("required exact text or wiring snippet"),
382 );
383 }
384
385 if contract.is_empty() {
386 return Ok(None);
387 }
388 Ok(Some(contract))
389}
390
391fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
392 if !values.iter().any(|existing| existing == &candidate) {
393 values.push(candidate);
394 }
395}
396
397pub fn workflow_verification_contracts(
398 graph: &WorkflowGraph,
399) -> Result<Vec<VerificationContract>, VmError> {
400 let mut contracts = Vec::new();
401 for (node_id, node) in &graph.nodes {
402 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
403 push_unique_contract(&mut contracts, contract);
404 }
405 }
406 Ok(contracts)
407}
408
409pub fn inject_workflow_verification_contracts(
410 node: &mut WorkflowNode,
411 contracts: &[VerificationContract],
412) {
413 if contracts.is_empty() {
414 return;
415 }
416 node.metadata.insert(
417 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
418 serde_json::to_value(contracts).unwrap_or_default(),
419 );
420}
421
422pub fn stage_verification_contracts(
423 node_id: &str,
424 node: &WorkflowNode,
425) -> Result<Vec<VerificationContract>, VmError> {
426 let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
427 let local_only = matches!(
428 node.metadata
429 .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
430 .and_then(|value| value.as_str()),
431 Some("local_only")
432 );
433 if local_only {
434 return Ok(local_contract.into_iter().collect());
435 }
436
437 let mut contracts = node
438 .metadata
439 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
440 .cloned()
441 .map(|value| {
442 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
443 VmError::Runtime(format!(
444 "workflow stage {node_id} verification contract metadata parse failed: {error}"
445 ))
446 })
447 })
448 .transpose()?
449 .unwrap_or_default();
450
451 if let Some(local_contract) = local_contract {
452 push_unique_contract(&mut contracts, local_contract);
453 }
454 Ok(contracts)
455}
456
457#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(default)]
459pub struct WorkflowEdge {
460 pub from: String,
461 pub to: String,
462 pub branch: Option<String>,
463 pub label: Option<String>,
464}
465
466#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
467#[serde(default)]
468pub struct WorkflowGraph {
469 #[serde(rename = "_type")]
470 pub type_name: String,
471 pub id: String,
472 pub name: Option<String>,
473 pub version: usize,
474 pub entry: String,
475 pub nodes: BTreeMap<String, WorkflowNode>,
476 pub edges: Vec<WorkflowEdge>,
477 pub capability_policy: CapabilityPolicy,
478 pub approval_policy: super::ToolApprovalPolicy,
479 pub metadata: BTreeMap<String, serde_json::Value>,
480 pub audit_log: Vec<WorkflowAuditEntry>,
481}
482
483#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
484#[serde(default)]
485pub struct WorkflowAuditEntry {
486 pub id: String,
487 pub op: String,
488 pub node_id: Option<String>,
489 pub timestamp: String,
490 pub reason: Option<String>,
491 pub metadata: BTreeMap<String, serde_json::Value>,
492}
493
494#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
495#[serde(default)]
496pub struct WorkflowValidationReport {
497 pub valid: bool,
498 pub errors: Vec<String>,
499 pub warnings: Vec<String>,
500 pub reachable_nodes: Vec<String>,
501}
502
503pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
504 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
505 let dict = value.as_dict();
506 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
507 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
508 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
509 node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
510 Ok(node)
511}
512
513pub fn parse_workflow_node_json(
514 json: serde_json::Value,
515 label: &str,
516) -> Result<WorkflowNode, VmError> {
517 super::parse_json_payload(json, label)
518}
519
520pub fn parse_workflow_edge_json(
521 json: serde_json::Value,
522 label: &str,
523) -> Result<WorkflowEdge, VmError> {
524 super::parse_json_payload(json, label)
525}
526
527pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
528 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
529 let as_dict = value.as_dict().cloned().unwrap_or_default();
530
531 if graph.nodes.is_empty() {
532 for key in ["act", "verify", "repair"] {
533 if let Some(node_value) = as_dict.get(key) {
534 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
535 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
536 node.id = Some(key.to_string());
537 if node.kind.is_empty() {
538 node.kind = if key == "verify" {
539 "verify".to_string()
540 } else {
541 "stage".to_string()
542 };
543 }
544 if node.model_policy.provider.is_none() {
545 node.model_policy.provider = as_dict
546 .get("provider")
547 .map(|value| value.display())
548 .filter(|value| !value.is_empty());
549 }
550 if node.model_policy.model.is_none() {
551 node.model_policy.model = as_dict
552 .get("model")
553 .map(|value| value.display())
554 .filter(|value| !value.is_empty());
555 }
556 if node.model_policy.model_tier.is_none() {
557 node.model_policy.model_tier = as_dict
558 .get("model_tier")
559 .or_else(|| as_dict.get("tier"))
560 .map(|value| value.display())
561 .filter(|value| !value.is_empty());
562 }
563 if node.model_policy.temperature.is_none() {
564 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
565 if let VmValue::Float(number) = value {
566 Some(*number)
567 } else {
568 value.as_int().map(|number| number as f64)
569 }
570 });
571 }
572 if node.model_policy.max_tokens.is_none() {
573 node.model_policy.max_tokens =
574 as_dict.get("max_tokens").and_then(|value| value.as_int());
575 }
576 if node.mode.is_none() {
577 node.mode = as_dict
578 .get("mode")
579 .map(|value| value.display())
580 .filter(|value| !value.is_empty());
581 }
582 if node.done_sentinel.is_none() {
583 node.done_sentinel = as_dict
584 .get("done_sentinel")
585 .map(|value| value.display())
586 .filter(|value| !value.is_empty());
587 }
588 if key == "verify"
589 && node.verify.is_none()
590 && (raw_node.contains_key("assert_text")
591 || raw_node.contains_key("command")
592 || raw_node.contains_key("expect_status")
593 || raw_node.contains_key("expect_text"))
594 {
595 node.verify = Some(serde_json::json!({
596 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
597 "command": raw_node.get("command").map(vm_value_to_json),
598 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
599 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
600 }));
601 }
602 graph.nodes.insert(key.to_string(), node);
603 }
604 }
605 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
606 graph.entry = "act".to_string();
607 }
608 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
609 if graph.nodes.contains_key("verify") {
610 graph.edges.push(WorkflowEdge {
611 from: "act".to_string(),
612 to: "verify".to_string(),
613 branch: None,
614 label: None,
615 });
616 }
617 if graph.nodes.contains_key("repair") {
618 graph.edges.push(WorkflowEdge {
619 from: "verify".to_string(),
620 to: "repair".to_string(),
621 branch: Some("failed".to_string()),
622 label: None,
623 });
624 graph.edges.push(WorkflowEdge {
625 from: "repair".to_string(),
626 to: "verify".to_string(),
627 branch: Some("retry".to_string()),
628 label: None,
629 });
630 }
631 }
632 }
633
634 if graph.type_name.is_empty() {
635 graph.type_name = "workflow_graph".to_string();
636 }
637 if graph.id.is_empty() {
638 graph.id = new_id("workflow");
639 }
640 if graph.version == 0 {
641 graph.version = 1;
642 }
643 if graph.entry.is_empty() {
644 graph.entry = graph
645 .nodes
646 .keys()
647 .next()
648 .cloned()
649 .unwrap_or_else(|| "act".to_string());
650 }
651 for (node_id, node) in &mut graph.nodes {
652 if node.raw_tools.is_none() {
653 node.raw_tools = as_dict
654 .get("nodes")
655 .and_then(|nodes| nodes.as_dict())
656 .and_then(|nodes| nodes.get(node_id))
657 .and_then(|node_value| node_value.as_dict())
658 .and_then(|raw_node| raw_node.get("tools"))
659 .cloned();
660 }
661 if node.id.is_none() {
662 node.id = Some(node_id.clone());
663 }
664 if node.kind.is_empty() {
665 node.kind = "stage".to_string();
666 }
667 if node.join_policy.strategy.is_empty() {
668 node.join_policy.strategy = "all".to_string();
669 }
670 if node.reduce_policy.strategy.is_empty() {
671 node.reduce_policy.strategy = "concat".to_string();
672 }
673 if node.output_contract.output_kinds.is_empty() {
674 node.output_contract.output_kinds = vec![match node.kind.as_str() {
675 "verify" => "verification_result".to_string(),
676 "reduce" => node
677 .reduce_policy
678 .output_kind
679 .clone()
680 .unwrap_or_else(|| "summary".to_string()),
681 "map" => node
682 .map_policy
683 .output_kind
684 .clone()
685 .unwrap_or_else(|| "artifact".to_string()),
686 "escalation" => "plan".to_string(),
687 _ => "artifact".to_string(),
688 }];
689 }
690 if node.retry_policy.max_attempts == 0 {
691 node.retry_policy.max_attempts = 1;
692 }
693 }
694 Ok(graph)
695}
696
697pub fn validate_workflow(
698 graph: &WorkflowGraph,
699 ceiling: Option<&CapabilityPolicy>,
700) -> WorkflowValidationReport {
701 let mut errors = Vec::new();
702 let mut warnings = Vec::new();
703
704 if !graph.nodes.contains_key(&graph.entry) {
705 errors.push(format!("entry node does not exist: {}", graph.entry));
706 }
707
708 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
709 for edge in &graph.edges {
710 if !node_ids.contains(&edge.from) {
711 errors.push(format!("edge.from references unknown node: {}", edge.from));
712 }
713 if !node_ids.contains(&edge.to) {
714 errors.push(format!("edge.to references unknown node: {}", edge.to));
715 }
716 }
717
718 let reachable_nodes = reachable_nodes(graph);
719 for node_id in &node_ids {
720 if !reachable_nodes.contains(node_id) {
721 warnings.push(format!("node is unreachable: {node_id}"));
722 }
723 }
724
725 for (node_id, node) in &graph.nodes {
726 let incoming = graph
727 .edges
728 .iter()
729 .filter(|edge| edge.to == *node_id)
730 .count();
731 let outgoing: Vec<&WorkflowEdge> = graph
732 .edges
733 .iter()
734 .filter(|edge| edge.from == *node_id)
735 .collect();
736 if let Some(min_inputs) = node.input_contract.min_inputs {
737 if let Some(max_inputs) = node.input_contract.max_inputs {
738 if min_inputs > max_inputs {
739 errors.push(format!(
740 "node {node_id}: input contract min_inputs exceeds max_inputs"
741 ));
742 }
743 }
744 }
745 match node.kind.as_str() {
746 "condition" => {
747 let has_true = outgoing
748 .iter()
749 .any(|edge| edge.branch.as_deref() == Some("true"));
750 let has_false = outgoing
751 .iter()
752 .any(|edge| edge.branch.as_deref() == Some("false"));
753 if !has_true || !has_false {
754 errors.push(format!(
755 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
756 ));
757 }
758 }
759 "fork" if outgoing.len() < 2 => {
760 errors.push(format!(
761 "node {node_id}: fork nodes require at least two outgoing edges"
762 ));
763 }
764 "join" if incoming < 2 => {
765 warnings.push(format!(
766 "node {node_id}: join node has fewer than two incoming edges"
767 ));
768 }
769 "map"
770 if node.map_policy.items.is_empty()
771 && node.map_policy.item_artifact_kind.is_none()
772 && node.input_contract.input_kinds.is_empty() =>
773 {
774 errors.push(format!(
775 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
776 ));
777 }
778 "reduce" if node.input_contract.input_kinds.is_empty() => {
779 warnings.push(format!(
780 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
781 ));
782 }
783 _ => {}
784 }
785 }
786
787 if let Some(ceiling) = ceiling {
788 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
789 errors.push(error);
790 }
791 for (node_id, node) in &graph.nodes {
792 if let Err(error) = ceiling.intersect(&node.capability_policy) {
793 errors.push(format!("node {node_id}: {error}"));
794 }
795 }
796 }
797
798 for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
799 let message = format!("{}: {}", diagnostic.code, diagnostic.message);
800 match diagnostic.severity {
801 crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
802 crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
803 }
804 }
805
806 WorkflowValidationReport {
807 valid: errors.is_empty(),
808 errors,
809 warnings,
810 reachable_nodes: reachable_nodes.into_iter().collect(),
811 }
812}
813
814fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
815 let mut seen = BTreeSet::new();
816 let mut stack = vec![graph.entry.clone()];
817 while let Some(node_id) = stack.pop() {
818 if !seen.insert(node_id.clone()) {
819 continue;
820 }
821 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
822 stack.push(edge.to.clone());
823 }
824 }
825 seen
826}
827
828fn resolve_node_session_id(node: &WorkflowNode) -> String {
834 if let Some(explicit) = node
835 .raw_model_policy
836 .as_ref()
837 .and_then(|v| v.as_dict())
838 .and_then(|d| d.get("session_id"))
839 .and_then(|v| match v {
840 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
841 _ => None,
842 })
843 {
844 return explicit;
845 }
846 if let Some(persisted) = node
847 .metadata
848 .get("worker_session_id")
849 .and_then(|value| value.as_str())
850 .filter(|value| !value.trim().is_empty())
851 {
852 return persisted.to_string();
853 }
854 format!("workflow_stage_{}", uuid::Uuid::now_v7())
855}
856
857fn raw_auto_compact_dict(
858 node: &WorkflowNode,
859) -> Option<&std::collections::BTreeMap<String, VmValue>> {
860 node.raw_auto_compact
861 .as_ref()
862 .and_then(|value| value.as_dict())
863}
864
865fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
866 raw_auto_compact_dict(node)
867 .and_then(|dict| dict.get(key))
868 .and_then(|value| value.as_int())
869 .filter(|value| *value >= 0)
870 .map(|value| value as usize)
871}
872
873fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
874 raw_auto_compact_dict(node)
875 .and_then(|dict| dict.get(key))
876 .and_then(|value| match value {
877 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
878 _ => None,
879 })
880}
881
882fn raw_model_policy_dict(node: &WorkflowNode) -> Option<&BTreeMap<String, VmValue>> {
883 node.raw_model_policy
884 .as_ref()
885 .and_then(|value| value.as_dict())
886}
887
888fn insert_json_vm_option<T: Serialize>(
889 options: &mut BTreeMap<String, VmValue>,
890 key: &str,
891 value: &T,
892) -> Result<(), VmError> {
893 let json = serde_json::to_value(value).map_err(|error| {
894 VmError::Runtime(format!("workflow stage option encode error: {error}"))
895 })?;
896 options.insert(key.to_string(), crate::stdlib::json_to_vm_value(&json));
897 Ok(())
898}
899
900fn merge_raw_model_policy_options(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
901 if let Some(raw) = raw_model_policy_dict(node) {
902 for (key, value) in raw {
903 if !matches!(value, VmValue::Nil) {
904 options.insert(key.clone(), value.clone());
905 }
906 }
907 }
908}
909
910fn preserve_nested_command_policy(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
911 if options.contains_key("command_policy") {
912 return;
913 }
914 let Some(command_policy) = raw_model_policy_dict(node)
915 .and_then(|dict| dict.get("policy"))
916 .and_then(|value| value.as_dict())
917 .and_then(|policy| policy.get("command_policy"))
918 else {
919 return;
920 };
921 options.insert("command_policy".to_string(), command_policy.clone());
922}
923
924fn stage_tools_value(node: &WorkflowNode) -> Option<VmValue> {
925 node.raw_tools.clone().or_else(|| {
926 if matches!(node.tools, serde_json::Value::Null) {
927 None
928 } else {
929 Some(crate::stdlib::json_to_vm_value(&node.tools))
930 }
931 })
932}
933
934fn add_stage_tools_option(
935 options: &mut BTreeMap<String, VmValue>,
936 tools_value: &Option<VmValue>,
937 tool_names: &[String],
938) {
939 if !tool_names.is_empty() {
940 if let Some(value) = tools_value.clone() {
941 options.insert("tools".to_string(), value);
942 }
943 }
944}
945
946fn workflow_stage_llm_options(
947 node: &WorkflowNode,
948 stage_session_id: &str,
949 tools_value: &Option<VmValue>,
950 tool_names: &[String],
951 stage_agent_options: &super::WorkflowStageAgentOptions,
952) -> BTreeMap<String, VmValue> {
953 let mut options = stage_agent_options.llm_options_vm_dict();
954 merge_raw_model_policy_options(&mut options, node);
955 options.insert(
956 "session_id".to_string(),
957 VmValue::String(std::sync::Arc::from(stage_session_id.to_string())),
958 );
959 options.insert(
960 "tool_format".to_string(),
961 VmValue::String(std::sync::Arc::from(
962 stage_agent_options.tool_format.clone(),
963 )),
964 );
965 add_stage_tools_option(&mut options, tools_value, tool_names);
966 options
967}
968
969fn add_workflow_agent_compaction_options(
970 options: &mut BTreeMap<String, VmValue>,
971 node: &WorkflowNode,
972) {
973 if !node.auto_compact.enabled {
974 options.insert("auto_compact".to_string(), VmValue::Bool(false));
975 return;
976 }
977 options.insert("auto_compact".to_string(), VmValue::Bool(true));
978 if let Some(value) = node.auto_compact.token_threshold {
979 options.insert("compact_threshold".to_string(), VmValue::Int(value as i64));
980 }
981 if let Some(value) = node.auto_compact.tool_output_max_chars {
982 options.insert(
983 "tool_output_max_chars".to_string(),
984 VmValue::Int(value as i64),
985 );
986 }
987 if let Some(value) = node.auto_compact.hard_limit_tokens {
988 options.insert("hard_limit_tokens".to_string(), VmValue::Int(value as i64));
989 }
990 if let Some(strategy) = node.auto_compact.compact_strategy.as_ref() {
991 options.insert(
992 "compact_strategy".to_string(),
993 VmValue::String(std::sync::Arc::from(strategy.clone())),
994 );
995 }
996 if let Some(strategy) = node.auto_compact.hard_limit_strategy.as_ref() {
997 options.insert(
998 "hard_limit_strategy".to_string(),
999 VmValue::String(std::sync::Arc::from(strategy.clone())),
1000 );
1001 }
1002 if let Some(value) = raw_auto_compact_int(node, "compact_keep_last")
1003 .or_else(|| raw_auto_compact_int(node, "keep_last"))
1004 {
1005 options.insert("compact_keep_last".to_string(), VmValue::Int(value as i64));
1006 }
1007 if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1008 options.insert(
1009 "summarize_prompt".to_string(),
1010 VmValue::String(std::sync::Arc::from(prompt)),
1011 );
1012 }
1013 if let Some(dict) = raw_auto_compact_dict(node) {
1014 for key in ["compress_callback", "mask_callback"] {
1015 if let Some(callback) = dict.get(key) {
1016 options.insert(key.to_string(), callback.clone());
1017 }
1018 }
1019 if let Some(callback) = dict.get("custom_compactor") {
1020 options.insert("compact_callback".to_string(), callback.clone());
1021 }
1022 }
1023}
1024
1025fn workflow_stage_agent_loop_options(
1026 node: &WorkflowNode,
1027 stage_session_id: &str,
1028 tools_value: &Option<VmValue>,
1029 tool_names: &[String],
1030 stage_agent_options: &super::WorkflowStageAgentOptions,
1031) -> Result<BTreeMap<String, VmValue>, VmError> {
1032 let mut options = stage_agent_options.agent_loop_options_vm_dict();
1033 merge_raw_model_policy_options(&mut options, node);
1034 if let Some(context) = crate::orchestration::current_workflow_skill_context() {
1035 if !options.contains_key("skills") {
1036 if let Some(registry) = context.registry {
1037 options.insert("skills".to_string(), registry);
1038 }
1039 }
1040 if !options.contains_key("skill_match") {
1041 if let Some(match_config) = context.match_config {
1042 options.insert("skill_match".to_string(), match_config);
1043 }
1044 }
1045 }
1046 preserve_nested_command_policy(&mut options, node);
1047 add_workflow_agent_compaction_options(&mut options, node);
1048 add_stage_tools_option(&mut options, tools_value, tool_names);
1049 let tool_policy = tool_capability_policy_from_spec(&node.tools);
1050 let effective_policy = tool_policy
1051 .intersect(&node.capability_policy)
1052 .map_err(VmError::Runtime)?;
1053 insert_json_vm_option(&mut options, "policy", &effective_policy)?;
1054 insert_json_vm_option(&mut options, "approval_policy", &node.approval_policy)?;
1055 options.insert(
1056 "session_id".to_string(),
1057 VmValue::String(std::sync::Arc::from(stage_session_id.to_string())),
1058 );
1059 options.insert(
1060 "tool_format".to_string(),
1061 VmValue::String(std::sync::Arc::from(
1062 stage_agent_options.tool_format.clone(),
1063 )),
1064 );
1065 let stage_label = node
1066 .id
1067 .clone()
1068 .unwrap_or_else(|| stage_session_id.to_string());
1069 crate::orchestration::annotate_nested_execution_options(
1070 &mut options,
1071 crate::orchestration::NestedExecutionKind::WorkflowStage,
1072 &stage_label,
1073 );
1074 Ok(options)
1075}
1076
1077#[derive(Clone, Debug)]
1078pub struct PreparedWorkflowStageNode {
1079 pub prompt: String,
1080 pub system: Option<String>,
1081 pub run_agent_loop: bool,
1082 pub llm_options: BTreeMap<String, VmValue>,
1083 pub agent_loop_options: BTreeMap<String, VmValue>,
1084 pub result: Option<serde_json::Value>,
1085 pub selected: Vec<ArtifactRecord>,
1086 pub rendered_context: String,
1087 pub rendered_verification: String,
1088 pub verification_contracts: Vec<VerificationContract>,
1089 pub tool_format: String,
1090 pub stage_session_id: String,
1091}
1092
1093pub async fn prepare_stage_node(
1094 ctx: &crate::vm::AsyncBuiltinCtx,
1095 node_id: &str,
1096 node: &WorkflowNode,
1097 task: &str,
1098 artifacts: &[ArtifactRecord],
1099) -> Result<PreparedWorkflowStageNode, VmError> {
1100 let selected_stage = super::select_workflow_stage_artifacts(
1101 ctx,
1102 artifacts,
1103 &node.context_policy,
1104 &node.input_contract,
1105 )
1106 .await?;
1107 let selected = selected_stage.artifacts;
1108 let context_policy = selected_stage.context_policy;
1109 let rendered_context_override = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1110 let assembled =
1111 crate::stdlib::assemble::assemble_from_options(ctx, &selected, assembler).await?;
1112 Some(super::render_assembled_chunks(&assembled))
1113 } else {
1114 None
1115 };
1116 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1117 let stage_session_id = resolve_node_session_id(node);
1118 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1119 return Err(VmError::Runtime(format!(
1120 "workflow stage {node_id} requires an existing session \
1121 (call agent_session_open and feed session_id through model_policy \
1122 before entering this stage)"
1123 )));
1124 }
1125 if let Some(min_inputs) = node.input_contract.min_inputs {
1126 if selected.len() < min_inputs {
1127 return Err(VmError::Runtime(format!(
1128 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1129 )));
1130 }
1131 }
1132 if let Some(max_inputs) = node.input_contract.max_inputs {
1133 if selected.len() > max_inputs {
1134 return Err(VmError::Runtime(format!(
1135 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1136 )));
1137 }
1138 }
1139 let prepared_prompt = super::prepare_workflow_stage_prompt(
1140 ctx,
1141 task,
1142 node.task_label.as_deref(),
1143 &selected,
1144 &context_policy,
1145 rendered_context_override.as_deref(),
1146 &verification_contracts,
1147 )
1148 .await?;
1149 let prompt = prepared_prompt.prompt;
1150 let rendered_context = prepared_prompt.rendered_context;
1151 let rendered_verification = prepared_prompt.rendered_verification;
1152
1153 let tool_names = tool_names_from_spec(&node.tools);
1154 let stage_agent_options = super::prepare_workflow_stage_agent_options(
1155 ctx,
1156 node,
1157 &stage_session_id,
1158 !tool_names.is_empty(),
1159 )
1160 .await?;
1161 let tool_format = stage_agent_options.tool_format.clone();
1162 let result = if node.kind == "verify" {
1163 if let Some(command) = node
1164 .verify
1165 .as_ref()
1166 .and_then(|verify| verify.as_object())
1167 .and_then(|verify| verify.get("command"))
1168 .and_then(|value| value.as_str())
1169 .map(str::trim)
1170 .filter(|value| !value.is_empty())
1171 {
1172 let (program, args) = if cfg!(target_os = "windows") {
1173 ("cmd", vec!["/C".to_string(), command.to_string()])
1174 } else {
1175 ("/bin/sh", vec!["-c".to_string(), command.to_string()])
1179 };
1180 let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1181 stdin_null: true,
1182 ..Default::default()
1183 };
1184 if let Some(context) = crate::stdlib::process::current_execution_context() {
1185 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1186 crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1187 process_config.cwd = Some(std::path::PathBuf::from(cwd));
1188 }
1189 if !context.env.is_empty() {
1190 process_config.env.extend(context.env);
1191 }
1192 }
1193 let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1194 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1195 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1196 let combined = if stderr.is_empty() {
1197 stdout.clone()
1198 } else if stdout.is_empty() {
1199 stderr.clone()
1200 } else {
1201 format!("{stdout}\n{stderr}")
1202 };
1203 serde_json::json!({
1204 "status": "completed",
1205 "text": combined,
1206 "visible_text": combined,
1207 "command": command,
1208 "stdout": stdout,
1209 "stderr": stderr,
1210 "exit_status": output.status.code().unwrap_or(-1),
1211 "success": output.status.success(),
1212 })
1213 } else {
1214 serde_json::json!({
1215 "status": "completed",
1216 "text": "",
1217 "visible_text": "",
1218 })
1219 }
1220 } else {
1221 let tools_value = stage_tools_value(node);
1222 let llm_options = workflow_stage_llm_options(
1223 node,
1224 &stage_session_id,
1225 &tools_value,
1226 &tool_names,
1227 &stage_agent_options,
1228 );
1229 let agent_loop_options = if stage_agent_options.run_agent_loop {
1230 workflow_stage_agent_loop_options(
1231 node,
1232 &stage_session_id,
1233 &tools_value,
1234 &tool_names,
1235 &stage_agent_options,
1236 )?
1237 } else {
1238 BTreeMap::new()
1239 };
1240 return Ok(PreparedWorkflowStageNode {
1241 prompt,
1242 system: node.system.clone(),
1243 run_agent_loop: stage_agent_options.run_agent_loop,
1244 llm_options,
1245 agent_loop_options,
1246 result: None,
1247 selected,
1248 rendered_context,
1249 rendered_verification,
1250 verification_contracts,
1251 tool_format,
1252 stage_session_id,
1253 });
1254 };
1255
1256 Ok(PreparedWorkflowStageNode {
1257 prompt,
1258 system: node.system.clone(),
1259 run_agent_loop: false,
1260 llm_options: BTreeMap::new(),
1261 agent_loop_options: BTreeMap::new(),
1262 result: Some(result),
1263 selected,
1264 rendered_context,
1265 rendered_verification,
1266 verification_contracts,
1267 tool_format,
1268 stage_session_id,
1269 })
1270}
1271
1272pub fn complete_prepared_stage_node(
1273 node_id: &str,
1274 node: &WorkflowNode,
1275 prepared: &PreparedWorkflowStageNode,
1276 mut llm_result: serde_json::Value,
1277) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1278 if let Some(payload) = llm_result.as_object_mut() {
1279 payload.insert(
1280 "prompt".to_string(),
1281 serde_json::json!(prepared.prompt.clone()),
1282 );
1283 payload.insert(
1284 "system_prompt".to_string(),
1285 serde_json::json!(node.system.clone().unwrap_or_default()),
1286 );
1287 payload.insert(
1288 "rendered_context".to_string(),
1289 serde_json::json!(prepared.rendered_context.clone()),
1290 );
1291 if !prepared.verification_contracts.is_empty() {
1292 payload.insert(
1293 "verification_contracts".to_string(),
1294 serde_json::to_value(&prepared.verification_contracts).unwrap_or_default(),
1295 );
1296 payload.insert(
1297 "rendered_verification_context".to_string(),
1298 serde_json::json!(prepared.rendered_verification.clone()),
1299 );
1300 }
1301 payload.insert(
1302 "selected_artifact_ids".to_string(),
1303 serde_json::json!(prepared
1304 .selected
1305 .iter()
1306 .map(|artifact| artifact.id.clone())
1307 .collect::<Vec<_>>()),
1308 );
1309 payload.insert(
1310 "selected_artifact_titles".to_string(),
1311 serde_json::json!(prepared
1312 .selected
1313 .iter()
1314 .map(|artifact| artifact.title.clone())
1315 .collect::<Vec<_>>()),
1316 );
1317 match payload
1318 .entry("tools".to_string())
1319 .or_insert_with(|| serde_json::json!({}))
1320 {
1321 serde_json::Value::Object(tools) => {
1322 tools.insert(
1323 "mode".to_string(),
1324 serde_json::json!(prepared.tool_format.clone()),
1325 );
1326 }
1327 slot => {
1328 *slot = serde_json::json!({ "mode": prepared.tool_format.clone() });
1329 }
1330 }
1331 }
1332
1333 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1334 let result_transcript = llm_result
1338 .get("transcript")
1339 .cloned()
1340 .map(|value| crate::stdlib::json_to_vm_value(&value));
1341 let session_transcript = crate::agent_sessions::snapshot(&prepared.stage_session_id);
1342 let transcript = result_transcript
1343 .or(session_transcript)
1344 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1345 let output_kind = node
1346 .output_contract
1347 .output_kinds
1348 .first()
1349 .cloned()
1350 .unwrap_or_else(|| {
1351 if node.kind == "verify" {
1352 "verification_result".to_string()
1353 } else {
1354 "artifact".to_string()
1355 }
1356 });
1357 let mut metadata = BTreeMap::new();
1358 metadata.insert(
1359 "input_artifact_ids".to_string(),
1360 serde_json::json!(prepared
1361 .selected
1362 .iter()
1363 .map(|artifact| artifact.id.clone())
1364 .collect::<Vec<_>>()),
1365 );
1366 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1367 if !node.approval_policy.write_path_allowlist.is_empty() {
1368 metadata.insert(
1369 "changed_paths".to_string(),
1370 serde_json::json!(node.approval_policy.write_path_allowlist),
1371 );
1372 }
1373 let artifact = ArtifactRecord {
1374 type_name: "artifact".to_string(),
1375 id: new_id("artifact"),
1376 kind: output_kind,
1377 title: Some(format!("stage {node_id} output")),
1378 text: Some(visible_text),
1379 data: Some(llm_result.clone()),
1380 source: Some(node_id.to_string()),
1381 created_at: now_rfc3339(),
1382 freshness: Some("fresh".to_string()),
1383 priority: None,
1384 lineage: prepared
1385 .selected
1386 .iter()
1387 .map(|artifact| artifact.id.clone())
1388 .collect(),
1389 relevance: Some(1.0),
1390 estimated_tokens: None,
1391 stage: Some(node_id.to_string()),
1392 metadata,
1393 }
1394 .normalize();
1395
1396 Ok((llm_result, vec![artifact], transcript))
1397}
1398
1399pub async fn execute_stage_node(
1400 ctx: &crate::vm::AsyncBuiltinCtx,
1401 node_id: &str,
1402 node: &WorkflowNode,
1403 task: &str,
1404 artifacts: &[ArtifactRecord],
1405) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1406 let prepared = prepare_stage_node(ctx, node_id, node, task, artifacts).await?;
1407 let llm_result = if let Some(result) = prepared.result.clone() {
1408 result
1409 } else if prepared.run_agent_loop {
1410 let result = crate::stdlib::harn_entry::call_agent_loop(
1411 ctx,
1412 prepared.prompt.clone(),
1413 prepared.system.clone(),
1414 prepared.agent_loop_options.clone(),
1415 )
1416 .await?;
1417 crate::llm::vm_value_to_json(&result)
1418 } else {
1419 let args = vec![
1420 VmValue::String(std::sync::Arc::from(prepared.prompt.clone())),
1421 prepared
1422 .system
1423 .clone()
1424 .map(|s| VmValue::String(std::sync::Arc::from(s)))
1425 .unwrap_or(VmValue::Nil),
1426 VmValue::Dict(std::sync::Arc::new(prepared.llm_options.clone())),
1427 ];
1428 let opts = extract_llm_options(&args)?;
1429 let result = vm_call_llm_full(&opts).await?;
1430 crate::llm::agent_loop_result_from_llm(&result, opts)
1431 };
1432 complete_prepared_stage_node(node_id, node, &prepared, llm_result)
1433}
1434
1435pub fn append_audit_entry(
1436 graph: &mut WorkflowGraph,
1437 op: &str,
1438 node_id: Option<String>,
1439 reason: Option<String>,
1440 metadata: BTreeMap<String, serde_json::Value>,
1441) {
1442 graph.audit_log.push(WorkflowAuditEntry {
1443 id: new_id("audit"),
1444 op: op.to_string(),
1445 node_id,
1446 timestamp: now_rfc3339(),
1447 reason,
1448 metadata,
1449 });
1450}