1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_surface::{tool_capability_policy_from_spec, tool_names_from_spec};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23 pub id: Option<String>,
24 pub kind: String,
25 pub mode: Option<String>,
26 pub prompt: Option<String>,
27 pub system: Option<String>,
28 pub task_label: Option<String>,
29 pub done_sentinel: Option<String>,
30 pub tools: serde_json::Value,
31 pub model_policy: ModelPolicy,
32 pub auto_compact: AutoCompactPolicy,
37 #[serde(default)]
42 pub output_visibility: Option<String>,
43 pub context_policy: ContextPolicy,
44 pub retry_policy: RetryPolicy,
45 pub capability_policy: CapabilityPolicy,
46 pub approval_policy: super::ToolApprovalPolicy,
47 pub input_contract: StageContract,
48 pub output_contract: StageContract,
49 pub branch_semantics: BranchSemantics,
50 pub map_policy: MapPolicy,
51 pub join_policy: JoinPolicy,
52 pub reduce_policy: ReducePolicy,
53 pub escalation_policy: EscalationPolicy,
54 pub verify: Option<serde_json::Value>,
55 #[serde(default)]
60 pub exit_when_verified: bool,
61 pub metadata: BTreeMap<String, serde_json::Value>,
62 #[serde(skip)]
63 pub raw_tools: Option<VmValue>,
64 #[serde(skip)]
68 pub raw_auto_compact: Option<VmValue>,
69 #[serde(skip)]
72 pub raw_model_policy: Option<VmValue>,
73 #[serde(skip)]
78 pub raw_context_assembler: Option<VmValue>,
79}
80
81impl PartialEq for WorkflowNode {
82 fn eq(&self, other: &Self) -> bool {
83 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
84 }
85}
86
87#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(default)]
89pub struct VerificationRequirement {
90 pub kind: String,
91 pub value: String,
92 pub note: Option<String>,
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationContract {
98 pub source_node: Option<String>,
99 pub summary: Option<String>,
100 pub command: Option<String>,
101 pub expect_status: Option<i64>,
102 pub assert_text: Option<String>,
103 pub expect_text: Option<String>,
104 pub required_identifiers: Vec<String>,
105 pub required_paths: Vec<String>,
106 pub required_text: Vec<String>,
107 pub notes: Vec<String>,
108 pub checks: Vec<VerificationRequirement>,
109}
110
111impl VerificationContract {
112 fn is_empty(&self) -> bool {
113 self.summary.is_none()
114 && self.command.is_none()
115 && self.expect_status.is_none()
116 && self.assert_text.is_none()
117 && self.expect_text.is_none()
118 && self.required_identifiers.is_empty()
119 && self.required_paths.is_empty()
120 && self.required_text.is_empty()
121 && self.notes.is_empty()
122 && self.checks.is_empty()
123 }
124}
125
126fn push_unique_string(values: &mut Vec<String>, value: &str) {
127 let trimmed = value.trim();
128 if trimmed.is_empty() {
129 return;
130 }
131 if !values.iter().any(|existing| existing == trimmed) {
132 values.push(trimmed.to_string());
133 }
134}
135
136fn push_unique_requirement(
137 values: &mut Vec<VerificationRequirement>,
138 kind: &str,
139 value: &str,
140 note: Option<&str>,
141) {
142 let trimmed_kind = kind.trim();
143 let trimmed_value = value.trim();
144 let trimmed_note = note
145 .map(str::trim)
146 .filter(|candidate| !candidate.is_empty())
147 .map(|candidate| candidate.to_string());
148 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
149 return;
150 }
151 let candidate = VerificationRequirement {
152 kind: trimmed_kind.to_string(),
153 value: trimmed_value.to_string(),
154 note: trimmed_note,
155 };
156 if !values.iter().any(|existing| existing == &candidate) {
157 values.push(candidate);
158 }
159}
160
161fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
162 match value {
163 Some(serde_json::Value::String(text)) => {
164 let mut values = Vec::new();
165 push_unique_string(&mut values, text);
166 values
167 }
168 Some(serde_json::Value::Array(items)) => {
169 let mut values = Vec::new();
170 for item in items {
171 if let Some(text) = item.as_str() {
172 push_unique_string(&mut values, text);
173 }
174 }
175 values
176 }
177 _ => Vec::new(),
178 }
179}
180
181fn merge_verification_requirement_list(
182 target: &mut Vec<VerificationRequirement>,
183 value: Option<&serde_json::Value>,
184) {
185 let Some(items) = value.and_then(|raw| raw.as_array()) else {
186 return;
187 };
188 for item in items {
189 let Some(object) = item.as_object() else {
190 continue;
191 };
192 let kind = object
193 .get("kind")
194 .and_then(|value| value.as_str())
195 .unwrap_or_default();
196 let value = object
197 .get("value")
198 .and_then(|value| value.as_str())
199 .unwrap_or_default();
200 let note = object
201 .get("note")
202 .or_else(|| object.get("description"))
203 .or_else(|| object.get("reason"))
204 .and_then(|value| value.as_str());
205 push_unique_requirement(target, kind, value, note);
206 }
207}
208
209fn merge_verification_contract_fields(
210 target: &mut VerificationContract,
211 object: &serde_json::Map<String, serde_json::Value>,
212) {
213 if target.summary.is_none() {
214 target.summary = object
215 .get("summary")
216 .and_then(|value| value.as_str())
217 .map(str::trim)
218 .filter(|value| !value.is_empty())
219 .map(|value| value.to_string());
220 }
221 if target.command.is_none() {
222 target.command = object
223 .get("command")
224 .and_then(|value| value.as_str())
225 .map(str::trim)
226 .filter(|value| !value.is_empty())
227 .map(|value| value.to_string());
228 }
229 if target.expect_status.is_none() {
230 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
231 }
232 if target.assert_text.is_none() {
233 target.assert_text = object
234 .get("assert_text")
235 .and_then(|value| value.as_str())
236 .map(str::trim)
237 .filter(|value| !value.is_empty())
238 .map(|value| value.to_string());
239 }
240 if target.expect_text.is_none() {
241 target.expect_text = object
242 .get("expect_text")
243 .and_then(|value| value.as_str())
244 .map(str::trim)
245 .filter(|value| !value.is_empty())
246 .map(|value| value.to_string());
247 }
248
249 for value in json_string_list(
250 object
251 .get("required_identifiers")
252 .or_else(|| object.get("identifiers")),
253 ) {
254 push_unique_string(&mut target.required_identifiers, &value);
255 }
256 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
257 push_unique_string(&mut target.required_paths, &value);
258 }
259 for value in json_string_list(
260 object
261 .get("required_text")
262 .or_else(|| object.get("exact_text"))
263 .or_else(|| object.get("required_strings")),
264 ) {
265 push_unique_string(&mut target.required_text, &value);
266 }
267 for value in json_string_list(object.get("notes")) {
268 push_unique_string(&mut target.notes, &value);
269 }
270 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
271}
272
273fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
274 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
275 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
276 VmError::Runtime(format!(
277 "workflow verification contract read failed for {}: {error}",
278 resolved.display()
279 ))
280 })?;
281 serde_json::from_str(&contents).map_err(|error| {
282 VmError::Runtime(format!(
283 "workflow verification contract parse failed for {}: {error}",
284 resolved.display()
285 ))
286 })
287}
288
289fn resolve_verification_contract_path(
290 verify: &serde_json::Map<String, serde_json::Value>,
291) -> Result<Option<serde_json::Value>, VmError> {
292 let Some(path) = verify
293 .get("contract_path")
294 .or_else(|| verify.get("verification_contract_path"))
295 .and_then(|value| value.as_str())
296 .map(str::trim)
297 .filter(|value| !value.is_empty())
298 else {
299 return Ok(None);
300 };
301 Ok(Some(load_verification_contract_file(path)?))
302}
303
304pub fn verification_contract_from_verify(
305 node_id: &str,
306 verify: Option<&serde_json::Value>,
307) -> Result<Option<VerificationContract>, VmError> {
308 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
309 return Ok(None);
310 };
311
312 let mut contract = VerificationContract {
313 source_node: Some(node_id.to_string()),
314 ..Default::default()
315 };
316
317 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
318 let Some(object) = file_contract.as_object() else {
319 return Err(VmError::Runtime(
320 "workflow verification contract file must parse to a JSON object".to_string(),
321 ));
322 };
323 merge_verification_contract_fields(&mut contract, object);
324 }
325
326 if let Some(inline_contract) = verify_object.get("contract") {
327 let Some(object) = inline_contract.as_object() else {
328 return Err(VmError::Runtime(
329 "workflow verify.contract must be an object".to_string(),
330 ));
331 };
332 merge_verification_contract_fields(&mut contract, object);
333 }
334
335 merge_verification_contract_fields(&mut contract, verify_object);
336
337 if let Some(assert_text) = contract.assert_text.clone() {
338 push_unique_requirement(
339 &mut contract.checks,
340 "visible_text_contains",
341 &assert_text,
342 Some("verify stage requires visible output to contain this text"),
343 );
344 }
345 if let Some(expect_text) = contract.expect_text.clone() {
346 push_unique_requirement(
347 &mut contract.checks,
348 "combined_output_contains",
349 &expect_text,
350 Some("verify command requires combined stdout/stderr to contain this text"),
351 );
352 }
353 if let Some(expect_status) = contract.expect_status {
354 push_unique_requirement(
355 &mut contract.checks,
356 "expect_status",
357 &expect_status.to_string(),
358 Some("verify command exit status must match exactly"),
359 );
360 }
361 for identifier in contract.required_identifiers.clone() {
362 push_unique_requirement(
363 &mut contract.checks,
364 "identifier",
365 &identifier,
366 Some("use this exact identifier spelling"),
367 );
368 }
369 for path in contract.required_paths.clone() {
370 push_unique_requirement(
371 &mut contract.checks,
372 "path",
373 &path,
374 Some("preserve this exact path"),
375 );
376 }
377 for text in contract.required_text.clone() {
378 push_unique_requirement(
379 &mut contract.checks,
380 "text",
381 &text,
382 Some("required exact text or wiring snippet"),
383 );
384 }
385
386 if contract.is_empty() {
387 return Ok(None);
388 }
389 Ok(Some(contract))
390}
391
392fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
393 if !values.iter().any(|existing| existing == &candidate) {
394 values.push(candidate);
395 }
396}
397
398pub fn workflow_verification_contracts(
399 graph: &WorkflowGraph,
400) -> Result<Vec<VerificationContract>, VmError> {
401 let mut contracts = Vec::new();
402 for (node_id, node) in &graph.nodes {
403 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
404 push_unique_contract(&mut contracts, contract);
405 }
406 }
407 Ok(contracts)
408}
409
410pub fn inject_workflow_verification_contracts(
411 node: &mut WorkflowNode,
412 contracts: &[VerificationContract],
413) {
414 if contracts.is_empty() {
415 return;
416 }
417 node.metadata.insert(
418 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
419 serde_json::to_value(contracts).unwrap_or_default(),
420 );
421}
422
423pub fn stage_verification_contracts(
424 node_id: &str,
425 node: &WorkflowNode,
426) -> Result<Vec<VerificationContract>, VmError> {
427 let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
428 let local_only = matches!(
429 node.metadata
430 .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
431 .and_then(|value| value.as_str()),
432 Some("local_only")
433 );
434 if local_only {
435 return Ok(local_contract.into_iter().collect());
436 }
437
438 let mut contracts = node
439 .metadata
440 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
441 .cloned()
442 .map(|value| {
443 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
444 VmError::Runtime(format!(
445 "workflow stage {node_id} verification contract metadata parse failed: {error}"
446 ))
447 })
448 })
449 .transpose()?
450 .unwrap_or_default();
451
452 if let Some(local_contract) = local_contract {
453 push_unique_contract(&mut contracts, local_contract);
454 }
455 Ok(contracts)
456}
457
458#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
459#[serde(default)]
460pub struct WorkflowEdge {
461 pub from: String,
462 pub to: String,
463 pub branch: Option<String>,
464 pub label: Option<String>,
465}
466
467#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
468#[serde(default)]
469pub struct WorkflowGraph {
470 #[serde(rename = "_type")]
471 pub type_name: String,
472 pub id: String,
473 pub name: Option<String>,
474 pub version: usize,
475 pub entry: String,
476 pub nodes: BTreeMap<String, WorkflowNode>,
477 pub edges: Vec<WorkflowEdge>,
478 pub capability_policy: CapabilityPolicy,
479 pub approval_policy: super::ToolApprovalPolicy,
480 pub metadata: BTreeMap<String, serde_json::Value>,
481 pub audit_log: Vec<WorkflowAuditEntry>,
482}
483
484#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
485#[serde(default)]
486pub struct WorkflowAuditEntry {
487 pub id: String,
488 pub op: String,
489 pub node_id: Option<String>,
490 pub timestamp: String,
491 pub reason: Option<String>,
492 pub metadata: BTreeMap<String, serde_json::Value>,
493}
494
495#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
496#[serde(default)]
497pub struct WorkflowValidationReport {
498 pub valid: bool,
499 pub errors: Vec<String>,
500 pub warnings: Vec<String>,
501 pub reachable_nodes: Vec<String>,
502}
503
504pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
505 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
506 let dict = value.as_dict();
507 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
508 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
509 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
510 node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
511 Ok(node)
512}
513
514pub fn parse_workflow_node_json(
515 json: serde_json::Value,
516 label: &str,
517) -> Result<WorkflowNode, VmError> {
518 super::parse_json_payload(json, label)
519}
520
521pub fn parse_workflow_edge_json(
522 json: serde_json::Value,
523 label: &str,
524) -> Result<WorkflowEdge, VmError> {
525 super::parse_json_payload(json, label)
526}
527
528pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
529 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
530 let as_dict = value.as_dict().cloned().unwrap_or_default();
531
532 if graph.nodes.is_empty() {
533 for key in ["act", "verify", "repair"] {
534 if let Some(node_value) = as_dict.get(key) {
535 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
536 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
537 node.id = Some(key.to_string());
538 if node.kind.is_empty() {
539 node.kind = if key == "verify" {
540 "verify".to_string()
541 } else {
542 "stage".to_string()
543 };
544 }
545 if node.model_policy.provider.is_none() {
546 node.model_policy.provider = as_dict
547 .get("provider")
548 .map(|value| value.display())
549 .filter(|value| !value.is_empty());
550 }
551 if node.model_policy.model.is_none() {
552 node.model_policy.model = as_dict
553 .get("model")
554 .map(|value| value.display())
555 .filter(|value| !value.is_empty());
556 }
557 if node.model_policy.model_tier.is_none() {
558 node.model_policy.model_tier = as_dict
559 .get("model_tier")
560 .or_else(|| as_dict.get("tier"))
561 .map(|value| value.display())
562 .filter(|value| !value.is_empty());
563 }
564 if node.model_policy.temperature.is_none() {
565 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
566 if let VmValue::Float(number) = value {
567 Some(*number)
568 } else {
569 value.as_int().map(|number| number as f64)
570 }
571 });
572 }
573 if node.model_policy.max_tokens.is_none() {
574 node.model_policy.max_tokens =
575 as_dict.get("max_tokens").and_then(|value| value.as_int());
576 }
577 if node.mode.is_none() {
578 node.mode = as_dict
579 .get("mode")
580 .map(|value| value.display())
581 .filter(|value| !value.is_empty());
582 }
583 if node.done_sentinel.is_none() {
584 node.done_sentinel = as_dict
585 .get("done_sentinel")
586 .map(|value| value.display())
587 .filter(|value| !value.is_empty());
588 }
589 if key == "verify"
590 && node.verify.is_none()
591 && (raw_node.contains_key("assert_text")
592 || raw_node.contains_key("command")
593 || raw_node.contains_key("expect_status")
594 || raw_node.contains_key("expect_text"))
595 {
596 node.verify = Some(serde_json::json!({
597 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
598 "command": raw_node.get("command").map(vm_value_to_json),
599 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
600 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
601 }));
602 }
603 graph.nodes.insert(key.to_string(), node);
604 }
605 }
606 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
607 graph.entry = "act".to_string();
608 }
609 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
610 if graph.nodes.contains_key("verify") {
611 graph.edges.push(WorkflowEdge {
612 from: "act".to_string(),
613 to: "verify".to_string(),
614 branch: None,
615 label: None,
616 });
617 }
618 if graph.nodes.contains_key("repair") {
619 graph.edges.push(WorkflowEdge {
620 from: "verify".to_string(),
621 to: "repair".to_string(),
622 branch: Some("failed".to_string()),
623 label: None,
624 });
625 graph.edges.push(WorkflowEdge {
626 from: "repair".to_string(),
627 to: "verify".to_string(),
628 branch: Some("retry".to_string()),
629 label: None,
630 });
631 }
632 }
633 }
634
635 if graph.type_name.is_empty() {
636 graph.type_name = "workflow_graph".to_string();
637 }
638 if graph.id.is_empty() {
639 graph.id = new_id("workflow");
640 }
641 if graph.version == 0 {
642 graph.version = 1;
643 }
644 if graph.entry.is_empty() {
645 graph.entry = graph
646 .nodes
647 .keys()
648 .next()
649 .cloned()
650 .unwrap_or_else(|| "act".to_string());
651 }
652 for (node_id, node) in &mut graph.nodes {
653 if node.raw_tools.is_none() {
654 node.raw_tools = as_dict
655 .get("nodes")
656 .and_then(|nodes| nodes.as_dict())
657 .and_then(|nodes| nodes.get(node_id))
658 .and_then(|node_value| node_value.as_dict())
659 .and_then(|raw_node| raw_node.get("tools"))
660 .cloned();
661 }
662 if node.id.is_none() {
663 node.id = Some(node_id.clone());
664 }
665 if node.kind.is_empty() {
666 node.kind = "stage".to_string();
667 }
668 if node.join_policy.strategy.is_empty() {
669 node.join_policy.strategy = "all".to_string();
670 }
671 if node.reduce_policy.strategy.is_empty() {
672 node.reduce_policy.strategy = "concat".to_string();
673 }
674 if node.output_contract.output_kinds.is_empty() {
675 node.output_contract.output_kinds = vec![match node.kind.as_str() {
676 "verify" => "verification_result".to_string(),
677 "reduce" => node
678 .reduce_policy
679 .output_kind
680 .clone()
681 .unwrap_or_else(|| "summary".to_string()),
682 "map" => node
683 .map_policy
684 .output_kind
685 .clone()
686 .unwrap_or_else(|| "artifact".to_string()),
687 "escalation" => "plan".to_string(),
688 _ => "artifact".to_string(),
689 }];
690 }
691 if node.retry_policy.max_attempts == 0 {
692 node.retry_policy.max_attempts = 1;
693 }
694 }
695 Ok(graph)
696}
697
698pub fn validate_workflow(
699 graph: &WorkflowGraph,
700 ceiling: Option<&CapabilityPolicy>,
701) -> WorkflowValidationReport {
702 let mut errors = Vec::new();
703 let mut warnings = Vec::new();
704
705 if !graph.nodes.contains_key(&graph.entry) {
706 errors.push(format!("entry node does not exist: {}", graph.entry));
707 }
708
709 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
710 for edge in &graph.edges {
711 if !node_ids.contains(&edge.from) {
712 errors.push(format!("edge.from references unknown node: {}", edge.from));
713 }
714 if !node_ids.contains(&edge.to) {
715 errors.push(format!("edge.to references unknown node: {}", edge.to));
716 }
717 }
718
719 let reachable_nodes = reachable_nodes(graph);
720 for node_id in &node_ids {
721 if !reachable_nodes.contains(node_id) {
722 warnings.push(format!("node is unreachable: {node_id}"));
723 }
724 }
725
726 for (node_id, node) in &graph.nodes {
727 let incoming = graph
728 .edges
729 .iter()
730 .filter(|edge| edge.to == *node_id)
731 .count();
732 let outgoing: Vec<&WorkflowEdge> = graph
733 .edges
734 .iter()
735 .filter(|edge| edge.from == *node_id)
736 .collect();
737 if let Some(min_inputs) = node.input_contract.min_inputs {
738 if let Some(max_inputs) = node.input_contract.max_inputs {
739 if min_inputs > max_inputs {
740 errors.push(format!(
741 "node {node_id}: input contract min_inputs exceeds max_inputs"
742 ));
743 }
744 }
745 }
746 match node.kind.as_str() {
747 "condition" => {
748 let has_true = outgoing
749 .iter()
750 .any(|edge| edge.branch.as_deref() == Some("true"));
751 let has_false = outgoing
752 .iter()
753 .any(|edge| edge.branch.as_deref() == Some("false"));
754 if !has_true || !has_false {
755 errors.push(format!(
756 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
757 ));
758 }
759 }
760 "fork" if outgoing.len() < 2 => {
761 errors.push(format!(
762 "node {node_id}: fork nodes require at least two outgoing edges"
763 ));
764 }
765 "join" if incoming < 2 => {
766 warnings.push(format!(
767 "node {node_id}: join node has fewer than two incoming edges"
768 ));
769 }
770 "map"
771 if node.map_policy.items.is_empty()
772 && node.map_policy.item_artifact_kind.is_none()
773 && node.input_contract.input_kinds.is_empty() =>
774 {
775 errors.push(format!(
776 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
777 ));
778 }
779 "reduce" if node.input_contract.input_kinds.is_empty() => {
780 warnings.push(format!(
781 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
782 ));
783 }
784 _ => {}
785 }
786 }
787
788 if let Some(ceiling) = ceiling {
789 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
790 errors.push(error);
791 }
792 for (node_id, node) in &graph.nodes {
793 if let Err(error) = ceiling.intersect(&node.capability_policy) {
794 errors.push(format!("node {node_id}: {error}"));
795 }
796 }
797 }
798
799 for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
800 let message = format!("{}: {}", diagnostic.code, diagnostic.message);
801 match diagnostic.severity {
802 crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
803 crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
804 }
805 }
806
807 WorkflowValidationReport {
808 valid: errors.is_empty(),
809 errors,
810 warnings,
811 reachable_nodes: reachable_nodes.into_iter().collect(),
812 }
813}
814
815fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
816 let mut seen = BTreeSet::new();
817 let mut stack = vec![graph.entry.clone()];
818 while let Some(node_id) = stack.pop() {
819 if !seen.insert(node_id.clone()) {
820 continue;
821 }
822 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
823 stack.push(edge.to.clone());
824 }
825 }
826 seen
827}
828
829fn resolve_node_session_id(node: &WorkflowNode) -> String {
835 if let Some(explicit) = node
836 .raw_model_policy
837 .as_ref()
838 .and_then(|v| v.as_dict())
839 .and_then(|d| d.get("session_id"))
840 .and_then(|v| match v {
841 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
842 _ => None,
843 })
844 {
845 return explicit;
846 }
847 if let Some(persisted) = node
848 .metadata
849 .get("worker_session_id")
850 .and_then(|value| value.as_str())
851 .filter(|value| !value.trim().is_empty())
852 {
853 return persisted.to_string();
854 }
855 format!("workflow_stage_{}", uuid::Uuid::now_v7())
856}
857
858fn raw_auto_compact_dict(
859 node: &WorkflowNode,
860) -> Option<&std::collections::BTreeMap<String, VmValue>> {
861 node.raw_auto_compact
862 .as_ref()
863 .and_then(|value| value.as_dict())
864}
865
866fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
867 raw_auto_compact_dict(node)
868 .and_then(|dict| dict.get(key))
869 .and_then(|value| value.as_int())
870 .filter(|value| *value >= 0)
871 .map(|value| value as usize)
872}
873
874fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
875 raw_auto_compact_dict(node)
876 .and_then(|dict| dict.get(key))
877 .and_then(|value| match value {
878 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
879 _ => None,
880 })
881}
882
883fn raw_model_policy_dict(node: &WorkflowNode) -> Option<&BTreeMap<String, VmValue>> {
884 node.raw_model_policy
885 .as_ref()
886 .and_then(|value| value.as_dict())
887}
888
889fn insert_json_vm_option<T: Serialize>(
890 options: &mut BTreeMap<String, VmValue>,
891 key: &str,
892 value: &T,
893) -> Result<(), VmError> {
894 let json = serde_json::to_value(value).map_err(|error| {
895 VmError::Runtime(format!("workflow stage option encode error: {error}"))
896 })?;
897 options.insert(key.to_string(), crate::stdlib::json_to_vm_value(&json));
898 Ok(())
899}
900
901fn merge_raw_model_policy_options(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
902 if let Some(raw) = raw_model_policy_dict(node) {
903 for (key, value) in raw {
904 if !matches!(value, VmValue::Nil) {
905 options.insert(key.clone(), value.clone());
906 }
907 }
908 }
909}
910
911fn preserve_nested_command_policy(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
912 if options.contains_key("command_policy") {
913 return;
914 }
915 let Some(command_policy) = raw_model_policy_dict(node)
916 .and_then(|dict| dict.get("policy"))
917 .and_then(|value| value.as_dict())
918 .and_then(|policy| policy.get("command_policy"))
919 else {
920 return;
921 };
922 options.insert("command_policy".to_string(), command_policy.clone());
923}
924
925fn stage_tools_value(node: &WorkflowNode) -> Option<VmValue> {
926 node.raw_tools.clone().or_else(|| {
927 if matches!(node.tools, serde_json::Value::Null) {
928 None
929 } else {
930 Some(crate::stdlib::json_to_vm_value(&node.tools))
931 }
932 })
933}
934
935fn add_stage_tools_option(
936 options: &mut BTreeMap<String, VmValue>,
937 tools_value: &Option<VmValue>,
938 tool_names: &[String],
939) {
940 if !tool_names.is_empty() {
941 if let Some(value) = tools_value.clone() {
942 options.insert("tools".to_string(), value);
943 }
944 }
945}
946
947fn workflow_stage_llm_options(
948 node: &WorkflowNode,
949 stage_session_id: &str,
950 tools_value: &Option<VmValue>,
951 tool_names: &[String],
952 stage_agent_options: &super::WorkflowStageAgentOptions,
953) -> BTreeMap<String, VmValue> {
954 let mut options = stage_agent_options.llm_options_vm_dict();
955 merge_raw_model_policy_options(&mut options, node);
956 options.insert(
957 "session_id".to_string(),
958 VmValue::String(Rc::from(stage_session_id.to_string())),
959 );
960 options.insert(
961 "tool_format".to_string(),
962 VmValue::String(Rc::from(stage_agent_options.tool_format.clone())),
963 );
964 add_stage_tools_option(&mut options, tools_value, tool_names);
965 options
966}
967
968fn add_workflow_agent_compaction_options(
969 options: &mut BTreeMap<String, VmValue>,
970 node: &WorkflowNode,
971) {
972 if !node.auto_compact.enabled {
973 options.insert("auto_compact".to_string(), VmValue::Bool(false));
974 return;
975 }
976 options.insert("auto_compact".to_string(), VmValue::Bool(true));
977 if let Some(value) = node.auto_compact.token_threshold {
978 options.insert("compact_threshold".to_string(), VmValue::Int(value as i64));
979 }
980 if let Some(value) = node.auto_compact.tool_output_max_chars {
981 options.insert(
982 "tool_output_max_chars".to_string(),
983 VmValue::Int(value as i64),
984 );
985 }
986 if let Some(value) = node.auto_compact.hard_limit_tokens {
987 options.insert("hard_limit_tokens".to_string(), VmValue::Int(value as i64));
988 }
989 if let Some(strategy) = node.auto_compact.compact_strategy.as_ref() {
990 options.insert(
991 "compact_strategy".to_string(),
992 VmValue::String(Rc::from(strategy.clone())),
993 );
994 }
995 if let Some(strategy) = node.auto_compact.hard_limit_strategy.as_ref() {
996 options.insert(
997 "hard_limit_strategy".to_string(),
998 VmValue::String(Rc::from(strategy.clone())),
999 );
1000 }
1001 if let Some(value) = raw_auto_compact_int(node, "compact_keep_last")
1002 .or_else(|| raw_auto_compact_int(node, "keep_last"))
1003 {
1004 options.insert("compact_keep_last".to_string(), VmValue::Int(value as i64));
1005 }
1006 if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1007 options.insert(
1008 "summarize_prompt".to_string(),
1009 VmValue::String(Rc::from(prompt)),
1010 );
1011 }
1012 if let Some(dict) = raw_auto_compact_dict(node) {
1013 for key in ["compress_callback", "mask_callback"] {
1014 if let Some(callback) = dict.get(key) {
1015 options.insert(key.to_string(), callback.clone());
1016 }
1017 }
1018 if let Some(callback) = dict.get("custom_compactor") {
1019 options.insert("compact_callback".to_string(), callback.clone());
1020 }
1021 }
1022}
1023
1024fn workflow_stage_agent_loop_options(
1025 node: &WorkflowNode,
1026 stage_session_id: &str,
1027 tools_value: &Option<VmValue>,
1028 tool_names: &[String],
1029 stage_agent_options: &super::WorkflowStageAgentOptions,
1030) -> Result<BTreeMap<String, VmValue>, VmError> {
1031 let mut options = stage_agent_options.agent_loop_options_vm_dict();
1032 merge_raw_model_policy_options(&mut options, node);
1033 if let Some(context) = crate::orchestration::current_workflow_skill_context() {
1034 if !options.contains_key("skills") {
1035 if let Some(registry) = context.registry {
1036 options.insert("skills".to_string(), registry);
1037 }
1038 }
1039 if !options.contains_key("skill_match") {
1040 if let Some(match_config) = context.match_config {
1041 options.insert("skill_match".to_string(), match_config);
1042 }
1043 }
1044 }
1045 preserve_nested_command_policy(&mut options, node);
1046 add_workflow_agent_compaction_options(&mut options, node);
1047 add_stage_tools_option(&mut options, tools_value, tool_names);
1048 let tool_policy = tool_capability_policy_from_spec(&node.tools);
1049 let effective_policy = tool_policy
1050 .intersect(&node.capability_policy)
1051 .map_err(VmError::Runtime)?;
1052 insert_json_vm_option(&mut options, "policy", &effective_policy)?;
1053 insert_json_vm_option(&mut options, "approval_policy", &node.approval_policy)?;
1054 options.insert(
1055 "session_id".to_string(),
1056 VmValue::String(Rc::from(stage_session_id.to_string())),
1057 );
1058 options.insert(
1059 "tool_format".to_string(),
1060 VmValue::String(Rc::from(stage_agent_options.tool_format.clone())),
1061 );
1062 Ok(options)
1063}
1064
1065#[derive(Clone, Debug)]
1066pub struct PreparedWorkflowStageNode {
1067 pub prompt: String,
1068 pub system: Option<String>,
1069 pub run_agent_loop: bool,
1070 pub llm_options: BTreeMap<String, VmValue>,
1071 pub agent_loop_options: BTreeMap<String, VmValue>,
1072 pub result: Option<serde_json::Value>,
1073 pub selected: Vec<ArtifactRecord>,
1074 pub rendered_context: String,
1075 pub rendered_verification: String,
1076 pub verification_contracts: Vec<VerificationContract>,
1077 pub tool_format: String,
1078 pub stage_session_id: String,
1079}
1080
1081pub async fn prepare_stage_node(
1082 node_id: &str,
1083 node: &WorkflowNode,
1084 task: &str,
1085 artifacts: &[ArtifactRecord],
1086) -> Result<PreparedWorkflowStageNode, VmError> {
1087 let selected_stage = super::select_workflow_stage_artifacts(
1088 artifacts,
1089 &node.context_policy,
1090 &node.input_contract,
1091 )
1092 .await?;
1093 let selected = selected_stage.artifacts;
1094 let context_policy = selected_stage.context_policy;
1095 let rendered_context_override = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1096 let assembled =
1097 crate::stdlib::assemble::assemble_from_options(&selected, assembler).await?;
1098 Some(super::render_assembled_chunks(&assembled))
1099 } else {
1100 None
1101 };
1102 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1103 let stage_session_id = resolve_node_session_id(node);
1104 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1105 return Err(VmError::Runtime(format!(
1106 "workflow stage {node_id} requires an existing session \
1107 (call agent_session_open and feed session_id through model_policy \
1108 before entering this stage)"
1109 )));
1110 }
1111 if let Some(min_inputs) = node.input_contract.min_inputs {
1112 if selected.len() < min_inputs {
1113 return Err(VmError::Runtime(format!(
1114 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1115 )));
1116 }
1117 }
1118 if let Some(max_inputs) = node.input_contract.max_inputs {
1119 if selected.len() > max_inputs {
1120 return Err(VmError::Runtime(format!(
1121 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1122 )));
1123 }
1124 }
1125 let prepared_prompt = super::prepare_workflow_stage_prompt(
1126 task,
1127 node.task_label.as_deref(),
1128 &selected,
1129 &context_policy,
1130 rendered_context_override.as_deref(),
1131 &verification_contracts,
1132 )
1133 .await?;
1134 let prompt = prepared_prompt.prompt;
1135 let rendered_context = prepared_prompt.rendered_context;
1136 let rendered_verification = prepared_prompt.rendered_verification;
1137
1138 let tool_names = tool_names_from_spec(&node.tools);
1139 let stage_agent_options = super::prepare_workflow_stage_agent_options(
1140 node,
1141 &stage_session_id,
1142 !tool_names.is_empty(),
1143 )
1144 .await?;
1145 let tool_format = stage_agent_options.tool_format.clone();
1146 let result = if node.kind == "verify" {
1147 if let Some(command) = node
1148 .verify
1149 .as_ref()
1150 .and_then(|verify| verify.as_object())
1151 .and_then(|verify| verify.get("command"))
1152 .and_then(|value| value.as_str())
1153 .map(str::trim)
1154 .filter(|value| !value.is_empty())
1155 {
1156 let (program, args) = if cfg!(target_os = "windows") {
1157 ("cmd", vec!["/C".to_string(), command.to_string()])
1158 } else {
1159 ("/bin/sh", vec!["-lc".to_string(), command.to_string()])
1160 };
1161 let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1162 stdin_null: true,
1163 ..Default::default()
1164 };
1165 if let Some(context) = crate::stdlib::process::current_execution_context() {
1166 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1167 crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1168 process_config.cwd = Some(std::path::PathBuf::from(cwd));
1169 }
1170 if !context.env.is_empty() {
1171 process_config.env.extend(context.env);
1172 }
1173 }
1174 let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1175 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1176 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1177 let combined = if stderr.is_empty() {
1178 stdout.clone()
1179 } else if stdout.is_empty() {
1180 stderr.clone()
1181 } else {
1182 format!("{stdout}\n{stderr}")
1183 };
1184 serde_json::json!({
1185 "status": "completed",
1186 "text": combined,
1187 "visible_text": combined,
1188 "command": command,
1189 "stdout": stdout,
1190 "stderr": stderr,
1191 "exit_status": output.status.code().unwrap_or(-1),
1192 "success": output.status.success(),
1193 })
1194 } else {
1195 serde_json::json!({
1196 "status": "completed",
1197 "text": "",
1198 "visible_text": "",
1199 })
1200 }
1201 } else {
1202 let tools_value = stage_tools_value(node);
1203 let llm_options = workflow_stage_llm_options(
1204 node,
1205 &stage_session_id,
1206 &tools_value,
1207 &tool_names,
1208 &stage_agent_options,
1209 );
1210 let agent_loop_options = if stage_agent_options.run_agent_loop {
1211 workflow_stage_agent_loop_options(
1212 node,
1213 &stage_session_id,
1214 &tools_value,
1215 &tool_names,
1216 &stage_agent_options,
1217 )?
1218 } else {
1219 BTreeMap::new()
1220 };
1221 return Ok(PreparedWorkflowStageNode {
1222 prompt,
1223 system: node.system.clone(),
1224 run_agent_loop: stage_agent_options.run_agent_loop,
1225 llm_options,
1226 agent_loop_options,
1227 result: None,
1228 selected,
1229 rendered_context,
1230 rendered_verification,
1231 verification_contracts,
1232 tool_format,
1233 stage_session_id,
1234 });
1235 };
1236
1237 Ok(PreparedWorkflowStageNode {
1238 prompt,
1239 system: node.system.clone(),
1240 run_agent_loop: false,
1241 llm_options: BTreeMap::new(),
1242 agent_loop_options: BTreeMap::new(),
1243 result: Some(result),
1244 selected,
1245 rendered_context,
1246 rendered_verification,
1247 verification_contracts,
1248 tool_format,
1249 stage_session_id,
1250 })
1251}
1252
1253pub fn complete_prepared_stage_node(
1254 node_id: &str,
1255 node: &WorkflowNode,
1256 prepared: &PreparedWorkflowStageNode,
1257 mut llm_result: serde_json::Value,
1258) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1259 if let Some(payload) = llm_result.as_object_mut() {
1260 payload.insert(
1261 "prompt".to_string(),
1262 serde_json::json!(prepared.prompt.clone()),
1263 );
1264 payload.insert(
1265 "system_prompt".to_string(),
1266 serde_json::json!(node.system.clone().unwrap_or_default()),
1267 );
1268 payload.insert(
1269 "rendered_context".to_string(),
1270 serde_json::json!(prepared.rendered_context.clone()),
1271 );
1272 if !prepared.verification_contracts.is_empty() {
1273 payload.insert(
1274 "verification_contracts".to_string(),
1275 serde_json::to_value(&prepared.verification_contracts).unwrap_or_default(),
1276 );
1277 payload.insert(
1278 "rendered_verification_context".to_string(),
1279 serde_json::json!(prepared.rendered_verification.clone()),
1280 );
1281 }
1282 payload.insert(
1283 "selected_artifact_ids".to_string(),
1284 serde_json::json!(prepared
1285 .selected
1286 .iter()
1287 .map(|artifact| artifact.id.clone())
1288 .collect::<Vec<_>>()),
1289 );
1290 payload.insert(
1291 "selected_artifact_titles".to_string(),
1292 serde_json::json!(prepared
1293 .selected
1294 .iter()
1295 .map(|artifact| artifact.title.clone())
1296 .collect::<Vec<_>>()),
1297 );
1298 match payload
1299 .entry("tools".to_string())
1300 .or_insert_with(|| serde_json::json!({}))
1301 {
1302 serde_json::Value::Object(tools) => {
1303 tools.insert(
1304 "mode".to_string(),
1305 serde_json::json!(prepared.tool_format.clone()),
1306 );
1307 }
1308 slot => {
1309 *slot = serde_json::json!({ "mode": prepared.tool_format.clone() });
1310 }
1311 }
1312 }
1313
1314 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1315 let result_transcript = llm_result
1319 .get("transcript")
1320 .cloned()
1321 .map(|value| crate::stdlib::json_to_vm_value(&value));
1322 let session_transcript = crate::agent_sessions::snapshot(&prepared.stage_session_id);
1323 let transcript = result_transcript
1324 .or(session_transcript)
1325 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1326 let output_kind = node
1327 .output_contract
1328 .output_kinds
1329 .first()
1330 .cloned()
1331 .unwrap_or_else(|| {
1332 if node.kind == "verify" {
1333 "verification_result".to_string()
1334 } else {
1335 "artifact".to_string()
1336 }
1337 });
1338 let mut metadata = BTreeMap::new();
1339 metadata.insert(
1340 "input_artifact_ids".to_string(),
1341 serde_json::json!(prepared
1342 .selected
1343 .iter()
1344 .map(|artifact| artifact.id.clone())
1345 .collect::<Vec<_>>()),
1346 );
1347 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1348 if !node.approval_policy.write_path_allowlist.is_empty() {
1349 metadata.insert(
1350 "changed_paths".to_string(),
1351 serde_json::json!(node.approval_policy.write_path_allowlist),
1352 );
1353 }
1354 let artifact = ArtifactRecord {
1355 type_name: "artifact".to_string(),
1356 id: new_id("artifact"),
1357 kind: output_kind,
1358 title: Some(format!("stage {node_id} output")),
1359 text: Some(visible_text),
1360 data: Some(llm_result.clone()),
1361 source: Some(node_id.to_string()),
1362 created_at: now_rfc3339(),
1363 freshness: Some("fresh".to_string()),
1364 priority: None,
1365 lineage: prepared
1366 .selected
1367 .iter()
1368 .map(|artifact| artifact.id.clone())
1369 .collect(),
1370 relevance: Some(1.0),
1371 estimated_tokens: None,
1372 stage: Some(node_id.to_string()),
1373 metadata,
1374 }
1375 .normalize();
1376
1377 Ok((llm_result, vec![artifact], transcript))
1378}
1379
1380pub async fn execute_stage_node(
1381 node_id: &str,
1382 node: &WorkflowNode,
1383 task: &str,
1384 artifacts: &[ArtifactRecord],
1385) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1386 let prepared = prepare_stage_node(node_id, node, task, artifacts).await?;
1387 let llm_result = if let Some(result) = prepared.result.clone() {
1388 result
1389 } else if prepared.run_agent_loop {
1390 let result = crate::stdlib::harn_entry::call_agent_loop(
1391 prepared.prompt.clone(),
1392 prepared.system.clone(),
1393 prepared.agent_loop_options.clone(),
1394 )
1395 .await?;
1396 crate::llm::vm_value_to_json(&result)
1397 } else {
1398 let args = vec![
1399 VmValue::String(Rc::from(prepared.prompt.clone())),
1400 prepared
1401 .system
1402 .clone()
1403 .map(|s| VmValue::String(Rc::from(s)))
1404 .unwrap_or(VmValue::Nil),
1405 VmValue::Dict(Rc::new(prepared.llm_options.clone())),
1406 ];
1407 let opts = extract_llm_options(&args)?;
1408 let result = vm_call_llm_full(&opts).await?;
1409 crate::llm::agent_loop_result_from_llm(&result, opts)
1410 };
1411 complete_prepared_stage_node(node_id, node, &prepared, llm_result)
1412}
1413
1414pub fn append_audit_entry(
1415 graph: &mut WorkflowGraph,
1416 op: &str,
1417 node_id: Option<String>,
1418 reason: Option<String>,
1419 metadata: BTreeMap<String, serde_json::Value>,
1420) {
1421 graph.audit_log.push(WorkflowAuditEntry {
1422 id: new_id("audit"),
1423 op: op.to_string(),
1424 node_id,
1425 timestamp: now_rfc3339(),
1426 reason,
1427 metadata,
1428 });
1429}