1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23 pub id: Option<String>,
24 pub kind: String,
25 pub mode: Option<String>,
26 pub prompt: Option<String>,
27 pub system: Option<String>,
28 pub task_label: Option<String>,
29 pub done_sentinel: Option<String>,
30 pub tools: serde_json::Value,
31 pub model_policy: ModelPolicy,
32 pub auto_compact: AutoCompactPolicy,
37 #[serde(default)]
42 pub output_visibility: Option<String>,
43 pub context_policy: ContextPolicy,
44 pub retry_policy: RetryPolicy,
45 pub capability_policy: CapabilityPolicy,
46 pub approval_policy: super::ToolApprovalPolicy,
47 pub input_contract: StageContract,
48 pub output_contract: StageContract,
49 pub branch_semantics: BranchSemantics,
50 pub map_policy: MapPolicy,
51 pub join_policy: JoinPolicy,
52 pub reduce_policy: ReducePolicy,
53 pub escalation_policy: EscalationPolicy,
54 pub verify: Option<serde_json::Value>,
55 #[serde(default)]
60 pub exit_when_verified: bool,
61 pub metadata: BTreeMap<String, serde_json::Value>,
62 #[serde(skip)]
63 pub raw_tools: Option<VmValue>,
64 #[serde(skip)]
68 pub raw_auto_compact: Option<VmValue>,
69 #[serde(skip)]
72 pub raw_model_policy: Option<VmValue>,
73}
74
75impl PartialEq for WorkflowNode {
76 fn eq(&self, other: &Self) -> bool {
77 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
78 }
79}
80
81#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
82#[serde(default)]
83pub struct VerificationRequirement {
84 pub kind: String,
85 pub value: String,
86 pub note: Option<String>,
87}
88
89#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
90#[serde(default)]
91pub struct VerificationContract {
92 pub source_node: Option<String>,
93 pub summary: Option<String>,
94 pub command: Option<String>,
95 pub expect_status: Option<i64>,
96 pub assert_text: Option<String>,
97 pub expect_text: Option<String>,
98 pub required_identifiers: Vec<String>,
99 pub required_paths: Vec<String>,
100 pub required_text: Vec<String>,
101 pub notes: Vec<String>,
102 pub checks: Vec<VerificationRequirement>,
103}
104
105impl VerificationContract {
106 fn is_empty(&self) -> bool {
107 self.summary.is_none()
108 && self.command.is_none()
109 && self.expect_status.is_none()
110 && self.assert_text.is_none()
111 && self.expect_text.is_none()
112 && self.required_identifiers.is_empty()
113 && self.required_paths.is_empty()
114 && self.required_text.is_empty()
115 && self.notes.is_empty()
116 && self.checks.is_empty()
117 }
118}
119
120fn push_unique_string(values: &mut Vec<String>, value: &str) {
121 let trimmed = value.trim();
122 if trimmed.is_empty() {
123 return;
124 }
125 if !values.iter().any(|existing| existing == trimmed) {
126 values.push(trimmed.to_string());
127 }
128}
129
130fn push_unique_requirement(
131 values: &mut Vec<VerificationRequirement>,
132 kind: &str,
133 value: &str,
134 note: Option<&str>,
135) {
136 let trimmed_kind = kind.trim();
137 let trimmed_value = value.trim();
138 let trimmed_note = note
139 .map(str::trim)
140 .filter(|candidate| !candidate.is_empty())
141 .map(|candidate| candidate.to_string());
142 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
143 return;
144 }
145 let candidate = VerificationRequirement {
146 kind: trimmed_kind.to_string(),
147 value: trimmed_value.to_string(),
148 note: trimmed_note,
149 };
150 if !values.iter().any(|existing| existing == &candidate) {
151 values.push(candidate);
152 }
153}
154
155fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
156 match value {
157 Some(serde_json::Value::String(text)) => {
158 let mut values = Vec::new();
159 push_unique_string(&mut values, text);
160 values
161 }
162 Some(serde_json::Value::Array(items)) => {
163 let mut values = Vec::new();
164 for item in items {
165 if let Some(text) = item.as_str() {
166 push_unique_string(&mut values, text);
167 }
168 }
169 values
170 }
171 _ => Vec::new(),
172 }
173}
174
175fn merge_verification_requirement_list(
176 target: &mut Vec<VerificationRequirement>,
177 value: Option<&serde_json::Value>,
178) {
179 let Some(items) = value.and_then(|raw| raw.as_array()) else {
180 return;
181 };
182 for item in items {
183 let Some(object) = item.as_object() else {
184 continue;
185 };
186 let kind = object
187 .get("kind")
188 .and_then(|value| value.as_str())
189 .unwrap_or_default();
190 let value = object
191 .get("value")
192 .and_then(|value| value.as_str())
193 .unwrap_or_default();
194 let note = object
195 .get("note")
196 .or_else(|| object.get("description"))
197 .or_else(|| object.get("reason"))
198 .and_then(|value| value.as_str());
199 push_unique_requirement(target, kind, value, note);
200 }
201}
202
203fn merge_verification_contract_fields(
204 target: &mut VerificationContract,
205 object: &serde_json::Map<String, serde_json::Value>,
206) {
207 if target.summary.is_none() {
208 target.summary = object
209 .get("summary")
210 .and_then(|value| value.as_str())
211 .map(str::trim)
212 .filter(|value| !value.is_empty())
213 .map(|value| value.to_string());
214 }
215 if target.command.is_none() {
216 target.command = object
217 .get("command")
218 .and_then(|value| value.as_str())
219 .map(str::trim)
220 .filter(|value| !value.is_empty())
221 .map(|value| value.to_string());
222 }
223 if target.expect_status.is_none() {
224 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
225 }
226 if target.assert_text.is_none() {
227 target.assert_text = object
228 .get("assert_text")
229 .and_then(|value| value.as_str())
230 .map(str::trim)
231 .filter(|value| !value.is_empty())
232 .map(|value| value.to_string());
233 }
234 if target.expect_text.is_none() {
235 target.expect_text = object
236 .get("expect_text")
237 .and_then(|value| value.as_str())
238 .map(str::trim)
239 .filter(|value| !value.is_empty())
240 .map(|value| value.to_string());
241 }
242
243 for value in json_string_list(
244 object
245 .get("required_identifiers")
246 .or_else(|| object.get("identifiers")),
247 ) {
248 push_unique_string(&mut target.required_identifiers, &value);
249 }
250 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
251 push_unique_string(&mut target.required_paths, &value);
252 }
253 for value in json_string_list(
254 object
255 .get("required_text")
256 .or_else(|| object.get("exact_text"))
257 .or_else(|| object.get("required_strings")),
258 ) {
259 push_unique_string(&mut target.required_text, &value);
260 }
261 for value in json_string_list(object.get("notes")) {
262 push_unique_string(&mut target.notes, &value);
263 }
264 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
265}
266
267fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
268 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
269 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
270 VmError::Runtime(format!(
271 "workflow verification contract read failed for {}: {error}",
272 resolved.display()
273 ))
274 })?;
275 serde_json::from_str(&contents).map_err(|error| {
276 VmError::Runtime(format!(
277 "workflow verification contract parse failed for {}: {error}",
278 resolved.display()
279 ))
280 })
281}
282
283fn resolve_verification_contract_path(
284 verify: &serde_json::Map<String, serde_json::Value>,
285) -> Result<Option<serde_json::Value>, VmError> {
286 let Some(path) = verify
287 .get("contract_path")
288 .or_else(|| verify.get("verification_contract_path"))
289 .and_then(|value| value.as_str())
290 .map(str::trim)
291 .filter(|value| !value.is_empty())
292 else {
293 return Ok(None);
294 };
295 Ok(Some(load_verification_contract_file(path)?))
296}
297
298pub fn verification_contract_from_verify(
299 node_id: &str,
300 verify: Option<&serde_json::Value>,
301) -> Result<Option<VerificationContract>, VmError> {
302 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
303 return Ok(None);
304 };
305
306 let mut contract = VerificationContract {
307 source_node: Some(node_id.to_string()),
308 ..Default::default()
309 };
310
311 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
312 let Some(object) = file_contract.as_object() else {
313 return Err(VmError::Runtime(
314 "workflow verification contract file must parse to a JSON object".to_string(),
315 ));
316 };
317 merge_verification_contract_fields(&mut contract, object);
318 }
319
320 if let Some(inline_contract) = verify_object.get("contract") {
321 let Some(object) = inline_contract.as_object() else {
322 return Err(VmError::Runtime(
323 "workflow verify.contract must be an object".to_string(),
324 ));
325 };
326 merge_verification_contract_fields(&mut contract, object);
327 }
328
329 merge_verification_contract_fields(&mut contract, verify_object);
330
331 if let Some(assert_text) = contract.assert_text.clone() {
332 push_unique_requirement(
333 &mut contract.checks,
334 "visible_text_contains",
335 &assert_text,
336 Some("verify stage requires visible output to contain this text"),
337 );
338 }
339 if let Some(expect_text) = contract.expect_text.clone() {
340 push_unique_requirement(
341 &mut contract.checks,
342 "combined_output_contains",
343 &expect_text,
344 Some("verify command requires combined stdout/stderr to contain this text"),
345 );
346 }
347 if let Some(expect_status) = contract.expect_status {
348 push_unique_requirement(
349 &mut contract.checks,
350 "expect_status",
351 &expect_status.to_string(),
352 Some("verify command exit status must match exactly"),
353 );
354 }
355 for identifier in contract.required_identifiers.clone() {
356 push_unique_requirement(
357 &mut contract.checks,
358 "identifier",
359 &identifier,
360 Some("use this exact identifier spelling"),
361 );
362 }
363 for path in contract.required_paths.clone() {
364 push_unique_requirement(
365 &mut contract.checks,
366 "path",
367 &path,
368 Some("preserve this exact path"),
369 );
370 }
371 for text in contract.required_text.clone() {
372 push_unique_requirement(
373 &mut contract.checks,
374 "text",
375 &text,
376 Some("required exact text or wiring snippet"),
377 );
378 }
379
380 if contract.is_empty() {
381 return Ok(None);
382 }
383 Ok(Some(contract))
384}
385
386fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
387 if !values.iter().any(|existing| existing == &candidate) {
388 values.push(candidate);
389 }
390}
391
392pub fn workflow_verification_contracts(
393 graph: &WorkflowGraph,
394) -> Result<Vec<VerificationContract>, VmError> {
395 let mut contracts = Vec::new();
396 for (node_id, node) in &graph.nodes {
397 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
398 push_unique_contract(&mut contracts, contract);
399 }
400 }
401 Ok(contracts)
402}
403
404pub fn inject_workflow_verification_contracts(
405 node: &mut WorkflowNode,
406 contracts: &[VerificationContract],
407) {
408 if contracts.is_empty() {
409 return;
410 }
411 node.metadata.insert(
412 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
413 serde_json::to_value(contracts).unwrap_or_default(),
414 );
415}
416
417pub fn stage_verification_contracts(
418 node_id: &str,
419 node: &WorkflowNode,
420) -> Result<Vec<VerificationContract>, VmError> {
421 let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
422 let local_only = matches!(
423 node.metadata
424 .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
425 .and_then(|value| value.as_str()),
426 Some("local_only")
427 );
428 if local_only {
429 return Ok(local_contract.into_iter().collect());
430 }
431
432 let mut contracts = node
433 .metadata
434 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
435 .cloned()
436 .map(|value| {
437 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
438 VmError::Runtime(format!(
439 "workflow stage {node_id} verification contract metadata parse failed: {error}"
440 ))
441 })
442 })
443 .transpose()?
444 .unwrap_or_default();
445
446 if let Some(local_contract) = local_contract {
447 push_unique_contract(&mut contracts, local_contract);
448 }
449 Ok(contracts)
450}
451
452pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
453 match value {
454 serde_json::Value::Null => Vec::new(),
455 serde_json::Value::Array(items) => items
456 .iter()
457 .filter_map(|item| match item {
458 serde_json::Value::Object(map) => map
459 .get("name")
460 .and_then(|value| value.as_str())
461 .filter(|name| !name.is_empty())
462 .map(|name| name.to_string()),
463 _ => None,
464 })
465 .collect(),
466 serde_json::Value::Object(map) => {
467 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
468 return map
469 .get("tools")
470 .map(workflow_tool_names)
471 .unwrap_or_default();
472 }
473 map.get("name")
474 .and_then(|value| value.as_str())
475 .filter(|name| !name.is_empty())
476 .map(|name| vec![name.to_string()])
477 .unwrap_or_default()
478 }
479 _ => Vec::new(),
480 }
481}
482
483fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
484 fn rank(v: &str) -> usize {
485 match v {
486 "none" => 0,
487 "read_only" => 1,
488 "workspace_write" => 2,
489 "process_exec" => 3,
490 "network" => 4,
491 _ => 5,
492 }
493 }
494 levels.max_by_key(|level| rank(level))
495}
496
497fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
498 match value.and_then(|v| v.as_str()).unwrap_or("") {
499 "read" => ToolKind::Read,
500 "edit" => ToolKind::Edit,
501 "delete" => ToolKind::Delete,
502 "move" => ToolKind::Move,
503 "search" => ToolKind::Search,
504 "execute" => ToolKind::Execute,
505 "think" => ToolKind::Think,
506 "fetch" => ToolKind::Fetch,
507 _ => ToolKind::Other,
508 }
509}
510
511fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
512 let policy = map
513 .get("policy")
514 .and_then(|value| value.as_object())
515 .cloned()
516 .unwrap_or_default();
517
518 let capabilities = policy
519 .get("capabilities")
520 .and_then(|value| value.as_object())
521 .map(|caps| {
522 caps.iter()
523 .map(|(capability, ops)| {
524 let values = ops
525 .as_array()
526 .map(|items| {
527 items
528 .iter()
529 .filter_map(|item| item.as_str().map(|s| s.to_string()))
530 .collect::<Vec<_>>()
531 })
532 .unwrap_or_default();
533 (capability.clone(), values)
534 })
535 .collect::<BTreeMap<_, _>>()
536 })
537 .unwrap_or_default();
538
539 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
542 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
543 } else {
544 ToolArgSchema {
545 path_params: policy
546 .get("path_params")
547 .and_then(|value| value.as_array())
548 .map(|items| {
549 items
550 .iter()
551 .filter_map(|item| item.as_str().map(|s| s.to_string()))
552 .collect::<Vec<_>>()
553 })
554 .unwrap_or_default(),
555 arg_aliases: policy
556 .get("arg_aliases")
557 .and_then(|value| value.as_object())
558 .map(|aliases| {
559 aliases
560 .iter()
561 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
562 .collect::<BTreeMap<_, _>>()
563 })
564 .unwrap_or_default(),
565 required: policy
566 .get("required")
567 .and_then(|value| value.as_array())
568 .map(|items| {
569 items
570 .iter()
571 .filter_map(|item| item.as_str().map(|s| s.to_string()))
572 .collect::<Vec<_>>()
573 })
574 .unwrap_or_default(),
575 }
576 };
577
578 let kind = parse_tool_kind(policy.get("kind"));
579 let side_effect_level = policy
580 .get("side_effect_level")
581 .and_then(|value| value.as_str())
582 .map(SideEffectLevel::parse)
583 .unwrap_or_default();
584
585 ToolAnnotations {
586 kind,
587 side_effect_level,
588 arg_schema,
589 capabilities,
590 }
591}
592
593pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
594 match value {
595 serde_json::Value::Null => BTreeMap::new(),
596 serde_json::Value::Array(items) => items
597 .iter()
598 .filter_map(|item| match item {
599 serde_json::Value::Object(map) => map
600 .get("name")
601 .and_then(|value| value.as_str())
602 .filter(|name| !name.is_empty())
603 .map(|name| (name.to_string(), parse_tool_annotations(map))),
604 _ => None,
605 })
606 .collect(),
607 serde_json::Value::Object(map) => {
608 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
609 return map
610 .get("tools")
611 .map(workflow_tool_annotations)
612 .unwrap_or_default();
613 }
614 map.get("name")
615 .and_then(|value| value.as_str())
616 .filter(|name| !name.is_empty())
617 .map(|name| {
618 let mut annotations = BTreeMap::new();
619 annotations.insert(name.to_string(), parse_tool_annotations(map));
620 annotations
621 })
622 .unwrap_or_default()
623 }
624 _ => BTreeMap::new(),
625 }
626}
627
628pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
629 let tools = workflow_tool_names(value);
630 let tool_annotations = workflow_tool_annotations(value);
631 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
632 for annotations in tool_annotations.values() {
633 for (capability, ops) in &annotations.capabilities {
634 let entry = capabilities.entry(capability.clone()).or_default();
635 for op in ops {
636 if !entry.contains(op) {
637 entry.push(op.clone());
638 }
639 }
640 entry.sort();
641 }
642 }
643 let side_effect_level = max_side_effect_level(
644 tool_annotations
645 .values()
646 .map(|annotations| annotations.side_effect_level.as_str().to_string())
647 .filter(|level| level != "none"),
648 );
649 CapabilityPolicy {
650 tools,
651 capabilities,
652 workspace_roots: Vec::new(),
653 side_effect_level,
654 recursion_limit: None,
655 tool_arg_constraints: Vec::new(),
656 tool_annotations,
657 }
658}
659
660#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
661#[serde(default)]
662pub struct WorkflowEdge {
663 pub from: String,
664 pub to: String,
665 pub branch: Option<String>,
666 pub label: Option<String>,
667}
668
669#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
670#[serde(default)]
671pub struct WorkflowGraph {
672 #[serde(rename = "_type")]
673 pub type_name: String,
674 pub id: String,
675 pub name: Option<String>,
676 pub version: usize,
677 pub entry: String,
678 pub nodes: BTreeMap<String, WorkflowNode>,
679 pub edges: Vec<WorkflowEdge>,
680 pub capability_policy: CapabilityPolicy,
681 pub approval_policy: super::ToolApprovalPolicy,
682 pub metadata: BTreeMap<String, serde_json::Value>,
683 pub audit_log: Vec<WorkflowAuditEntry>,
684}
685
686#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
687#[serde(default)]
688pub struct WorkflowAuditEntry {
689 pub id: String,
690 pub op: String,
691 pub node_id: Option<String>,
692 pub timestamp: String,
693 pub reason: Option<String>,
694 pub metadata: BTreeMap<String, serde_json::Value>,
695}
696
697#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
698#[serde(default)]
699pub struct WorkflowValidationReport {
700 pub valid: bool,
701 pub errors: Vec<String>,
702 pub warnings: Vec<String>,
703 pub reachable_nodes: Vec<String>,
704}
705
706pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
707 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
708 let dict = value.as_dict();
709 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
710 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
711 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
712 Ok(node)
713}
714
715pub fn parse_workflow_node_json(
716 json: serde_json::Value,
717 label: &str,
718) -> Result<WorkflowNode, VmError> {
719 super::parse_json_payload(json, label)
720}
721
722pub fn parse_workflow_edge_json(
723 json: serde_json::Value,
724 label: &str,
725) -> Result<WorkflowEdge, VmError> {
726 super::parse_json_payload(json, label)
727}
728
729pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
730 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
731 let as_dict = value.as_dict().cloned().unwrap_or_default();
732
733 if graph.nodes.is_empty() {
734 for key in ["act", "verify", "repair"] {
735 if let Some(node_value) = as_dict.get(key) {
736 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
737 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
738 node.id = Some(key.to_string());
739 if node.kind.is_empty() {
740 node.kind = if key == "verify" {
741 "verify".to_string()
742 } else {
743 "stage".to_string()
744 };
745 }
746 if node.model_policy.provider.is_none() {
747 node.model_policy.provider = as_dict
748 .get("provider")
749 .map(|value| value.display())
750 .filter(|value| !value.is_empty());
751 }
752 if node.model_policy.model.is_none() {
753 node.model_policy.model = as_dict
754 .get("model")
755 .map(|value| value.display())
756 .filter(|value| !value.is_empty());
757 }
758 if node.model_policy.model_tier.is_none() {
759 node.model_policy.model_tier = as_dict
760 .get("model_tier")
761 .or_else(|| as_dict.get("tier"))
762 .map(|value| value.display())
763 .filter(|value| !value.is_empty());
764 }
765 if node.model_policy.temperature.is_none() {
766 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
767 if let VmValue::Float(number) = value {
768 Some(*number)
769 } else {
770 value.as_int().map(|number| number as f64)
771 }
772 });
773 }
774 if node.model_policy.max_tokens.is_none() {
775 node.model_policy.max_tokens =
776 as_dict.get("max_tokens").and_then(|value| value.as_int());
777 }
778 if node.mode.is_none() {
779 node.mode = as_dict
780 .get("mode")
781 .map(|value| value.display())
782 .filter(|value| !value.is_empty());
783 }
784 if node.done_sentinel.is_none() {
785 node.done_sentinel = as_dict
786 .get("done_sentinel")
787 .map(|value| value.display())
788 .filter(|value| !value.is_empty());
789 }
790 if key == "verify"
791 && node.verify.is_none()
792 && (raw_node.contains_key("assert_text")
793 || raw_node.contains_key("command")
794 || raw_node.contains_key("expect_status")
795 || raw_node.contains_key("expect_text"))
796 {
797 node.verify = Some(serde_json::json!({
798 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
799 "command": raw_node.get("command").map(vm_value_to_json),
800 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
801 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
802 }));
803 }
804 graph.nodes.insert(key.to_string(), node);
805 }
806 }
807 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
808 graph.entry = "act".to_string();
809 }
810 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
811 if graph.nodes.contains_key("verify") {
812 graph.edges.push(WorkflowEdge {
813 from: "act".to_string(),
814 to: "verify".to_string(),
815 branch: None,
816 label: None,
817 });
818 }
819 if graph.nodes.contains_key("repair") {
820 graph.edges.push(WorkflowEdge {
821 from: "verify".to_string(),
822 to: "repair".to_string(),
823 branch: Some("failed".to_string()),
824 label: None,
825 });
826 graph.edges.push(WorkflowEdge {
827 from: "repair".to_string(),
828 to: "verify".to_string(),
829 branch: Some("retry".to_string()),
830 label: None,
831 });
832 }
833 }
834 }
835
836 if graph.type_name.is_empty() {
837 graph.type_name = "workflow_graph".to_string();
838 }
839 if graph.id.is_empty() {
840 graph.id = new_id("workflow");
841 }
842 if graph.version == 0 {
843 graph.version = 1;
844 }
845 if graph.entry.is_empty() {
846 graph.entry = graph
847 .nodes
848 .keys()
849 .next()
850 .cloned()
851 .unwrap_or_else(|| "act".to_string());
852 }
853 for (node_id, node) in &mut graph.nodes {
854 if node.raw_tools.is_none() {
855 node.raw_tools = as_dict
856 .get("nodes")
857 .and_then(|nodes| nodes.as_dict())
858 .and_then(|nodes| nodes.get(node_id))
859 .and_then(|node_value| node_value.as_dict())
860 .and_then(|raw_node| raw_node.get("tools"))
861 .cloned();
862 }
863 if node.id.is_none() {
864 node.id = Some(node_id.clone());
865 }
866 if node.kind.is_empty() {
867 node.kind = "stage".to_string();
868 }
869 if node.join_policy.strategy.is_empty() {
870 node.join_policy.strategy = "all".to_string();
871 }
872 if node.reduce_policy.strategy.is_empty() {
873 node.reduce_policy.strategy = "concat".to_string();
874 }
875 if node.output_contract.output_kinds.is_empty() {
876 node.output_contract.output_kinds = vec![match node.kind.as_str() {
877 "verify" => "verification_result".to_string(),
878 "reduce" => node
879 .reduce_policy
880 .output_kind
881 .clone()
882 .unwrap_or_else(|| "summary".to_string()),
883 "map" => node
884 .map_policy
885 .output_kind
886 .clone()
887 .unwrap_or_else(|| "artifact".to_string()),
888 "escalation" => "plan".to_string(),
889 _ => "artifact".to_string(),
890 }];
891 }
892 if node.retry_policy.max_attempts == 0 {
893 node.retry_policy.max_attempts = 1;
894 }
895 }
896 Ok(graph)
897}
898
899pub fn validate_workflow(
900 graph: &WorkflowGraph,
901 ceiling: Option<&CapabilityPolicy>,
902) -> WorkflowValidationReport {
903 let mut errors = Vec::new();
904 let mut warnings = Vec::new();
905
906 if !graph.nodes.contains_key(&graph.entry) {
907 errors.push(format!("entry node does not exist: {}", graph.entry));
908 }
909
910 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
911 for edge in &graph.edges {
912 if !node_ids.contains(&edge.from) {
913 errors.push(format!("edge.from references unknown node: {}", edge.from));
914 }
915 if !node_ids.contains(&edge.to) {
916 errors.push(format!("edge.to references unknown node: {}", edge.to));
917 }
918 }
919
920 let reachable_nodes = reachable_nodes(graph);
921 for node_id in &node_ids {
922 if !reachable_nodes.contains(node_id) {
923 warnings.push(format!("node is unreachable: {node_id}"));
924 }
925 }
926
927 for (node_id, node) in &graph.nodes {
928 let incoming = graph
929 .edges
930 .iter()
931 .filter(|edge| edge.to == *node_id)
932 .count();
933 let outgoing: Vec<&WorkflowEdge> = graph
934 .edges
935 .iter()
936 .filter(|edge| edge.from == *node_id)
937 .collect();
938 if let Some(min_inputs) = node.input_contract.min_inputs {
939 if let Some(max_inputs) = node.input_contract.max_inputs {
940 if min_inputs > max_inputs {
941 errors.push(format!(
942 "node {node_id}: input contract min_inputs exceeds max_inputs"
943 ));
944 }
945 }
946 }
947 match node.kind.as_str() {
948 "condition" => {
949 let has_true = outgoing
950 .iter()
951 .any(|edge| edge.branch.as_deref() == Some("true"));
952 let has_false = outgoing
953 .iter()
954 .any(|edge| edge.branch.as_deref() == Some("false"));
955 if !has_true || !has_false {
956 errors.push(format!(
957 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
958 ));
959 }
960 }
961 "fork" if outgoing.len() < 2 => {
962 errors.push(format!(
963 "node {node_id}: fork nodes require at least two outgoing edges"
964 ));
965 }
966 "join" if incoming < 2 => {
967 warnings.push(format!(
968 "node {node_id}: join node has fewer than two incoming edges"
969 ));
970 }
971 "map"
972 if node.map_policy.items.is_empty()
973 && node.map_policy.item_artifact_kind.is_none()
974 && node.input_contract.input_kinds.is_empty() =>
975 {
976 errors.push(format!(
977 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
978 ));
979 }
980 "reduce" if node.input_contract.input_kinds.is_empty() => {
981 warnings.push(format!(
982 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
983 ));
984 }
985 _ => {}
986 }
987 }
988
989 if let Some(ceiling) = ceiling {
990 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
991 errors.push(error);
992 }
993 for (node_id, node) in &graph.nodes {
994 if let Err(error) = ceiling.intersect(&node.capability_policy) {
995 errors.push(format!("node {node_id}: {error}"));
996 }
997 }
998 }
999
1000 WorkflowValidationReport {
1001 valid: errors.is_empty(),
1002 errors,
1003 warnings,
1004 reachable_nodes: reachable_nodes.into_iter().collect(),
1005 }
1006}
1007
1008fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1009 let mut seen = BTreeSet::new();
1010 let mut stack = vec![graph.entry.clone()];
1011 while let Some(node_id) = stack.pop() {
1012 if !seen.insert(node_id.clone()) {
1013 continue;
1014 }
1015 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1016 stack.push(edge.to.clone());
1017 }
1018 }
1019 seen
1020}
1021
1022fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1034 let per_node = node
1035 .raw_model_policy
1036 .as_ref()
1037 .and_then(|v| v.as_dict())
1038 .and_then(|d| d.get("skills"))
1039 .cloned()
1040 .and_then(normalize_inline_registry);
1041 if per_node.is_some() {
1042 return per_node;
1043 }
1044 super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1045}
1046
1047fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1051 let per_node = node
1052 .raw_model_policy
1053 .as_ref()
1054 .and_then(|v| v.as_dict())
1055 .and_then(|d| d.get("skill_match"))
1056 .and_then(|v| v.as_dict().cloned());
1057 if let Some(dict) = per_node {
1058 return crate::llm::parse_skill_match_config_dict(&dict);
1059 }
1060 super::current_workflow_skill_context()
1061 .and_then(|ctx| ctx.match_config)
1062 .and_then(|v| v.as_dict().cloned())
1063 .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1064 .unwrap_or_default()
1065}
1066
1067fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1072 use std::collections::BTreeMap;
1073 use std::rc::Rc;
1074 match &value {
1075 VmValue::Dict(d)
1076 if d.get("_type")
1077 .map(|v| v.display() == "skill_registry")
1078 .unwrap_or(false) =>
1079 {
1080 Some(value)
1081 }
1082 VmValue::List(list) => {
1083 let mut dict = BTreeMap::new();
1084 dict.insert(
1085 "_type".to_string(),
1086 VmValue::String(Rc::from("skill_registry")),
1087 );
1088 dict.insert("skills".to_string(), VmValue::List(list.clone()));
1089 Some(VmValue::Dict(Rc::new(dict)))
1090 }
1091 _ => None,
1092 }
1093}
1094
1095fn resolve_node_session_id(node: &WorkflowNode) -> String {
1096 if let Some(explicit) = node
1097 .raw_model_policy
1098 .as_ref()
1099 .and_then(|v| v.as_dict())
1100 .and_then(|d| d.get("session_id"))
1101 .and_then(|v| match v {
1102 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1103 _ => None,
1104 })
1105 {
1106 return explicit;
1107 }
1108 if let Some(persisted) = node
1109 .metadata
1110 .get("worker_session_id")
1111 .and_then(|value| value.as_str())
1112 .filter(|value| !value.trim().is_empty())
1113 {
1114 return persisted.to_string();
1115 }
1116 format!("workflow_stage_{}", uuid::Uuid::now_v7())
1117}
1118
1119fn raw_auto_compact_dict(
1120 node: &WorkflowNode,
1121) -> Option<&std::collections::BTreeMap<String, VmValue>> {
1122 node.raw_auto_compact
1123 .as_ref()
1124 .and_then(|value| value.as_dict())
1125}
1126
1127fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
1128 raw_auto_compact_dict(node)
1129 .and_then(|dict| dict.get(key))
1130 .and_then(|value| value.as_int())
1131 .filter(|value| *value >= 0)
1132 .map(|value| value as usize)
1133}
1134
1135fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
1136 raw_auto_compact_dict(node)
1137 .and_then(|dict| dict.get(key))
1138 .and_then(|value| match value {
1139 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1140 _ => None,
1141 })
1142}
1143
1144pub(crate) async fn resolve_stage_auto_compact(
1145 node: &WorkflowNode,
1146 opts: &crate::llm::api::LlmCallOptions,
1147) -> Result<Option<crate::orchestration::AutoCompactConfig>, VmError> {
1148 if !node.auto_compact.enabled {
1149 return Ok(None);
1150 }
1151
1152 let mut ac = crate::orchestration::AutoCompactConfig::default();
1153 if let Some(v) = node.auto_compact.token_threshold {
1154 ac.token_threshold = v;
1155 }
1156 if let Some(v) = node.auto_compact.tool_output_max_chars {
1157 ac.tool_output_max_chars = v;
1158 }
1159 if let Some(ref strategy) = node.auto_compact.compact_strategy {
1160 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1161 ac.compact_strategy = s;
1162 }
1163 }
1164 if let Some(v) = node.auto_compact.hard_limit_tokens {
1165 ac.hard_limit_tokens = Some(v);
1166 }
1167 if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1168 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1169 ac.hard_limit_strategy = s;
1170 }
1171 }
1172
1173 if let Some(v) = raw_auto_compact_int(node, "compact_keep_last")
1177 .or_else(|| raw_auto_compact_int(node, "keep_last"))
1178 {
1179 ac.keep_last = v;
1180 }
1181 if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1182 ac.summarize_prompt = Some(prompt);
1183 }
1184
1185 if let Some(dict) = raw_auto_compact_dict(node) {
1188 if let Some(cb) = dict.get("compress_callback") {
1189 ac.compress_callback = Some(cb.clone());
1190 }
1191 if let Some(cb) = dict.get("mask_callback") {
1192 ac.mask_callback = Some(cb.clone());
1193 }
1194 if let Some(cb) = dict.get("custom_compactor") {
1195 ac.custom_compactor = Some(cb.clone());
1196 }
1197 }
1198
1199 let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1200 let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1201 crate::llm::api::adapt_auto_compact_to_provider(
1202 &mut ac,
1203 user_specified_threshold,
1204 user_specified_hard_limit,
1205 &opts.provider,
1206 &opts.model,
1207 &opts.api_key,
1208 )
1209 .await;
1210
1211 Ok(Some(ac))
1212}
1213
1214pub async fn execute_stage_node(
1215 node_id: &str,
1216 node: &WorkflowNode,
1217 task: &str,
1218 artifacts: &[ArtifactRecord],
1219) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1220 let mut selection_policy = node.context_policy.clone();
1221 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1222 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1223 }
1224 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1225 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
1226 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1227 let rendered_verification = super::render_verification_context(&verification_contracts);
1228 let stage_session_id = resolve_node_session_id(node);
1229 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1230 return Err(VmError::Runtime(format!(
1231 "workflow stage {node_id} requires an existing session \
1232 (call agent_session_open and feed session_id through model_policy \
1233 before entering this stage)"
1234 )));
1235 }
1236 if let Some(min_inputs) = node.input_contract.min_inputs {
1237 if selected.len() < min_inputs {
1238 return Err(VmError::Runtime(format!(
1239 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1240 )));
1241 }
1242 }
1243 if let Some(max_inputs) = node.input_contract.max_inputs {
1244 if selected.len() > max_inputs {
1245 return Err(VmError::Runtime(format!(
1246 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1247 )));
1248 }
1249 }
1250 let prompt = super::render_workflow_prompt(
1251 task,
1252 node.task_label.as_deref(),
1253 &rendered_verification,
1254 &rendered_context,
1255 );
1256
1257 let tool_format = node
1265 .model_policy
1266 .tool_format
1267 .clone()
1268 .filter(|value| !value.trim().is_empty())
1269 .or_else(|| {
1270 std::env::var("HARN_AGENT_TOOL_FORMAT")
1271 .ok()
1272 .filter(|value| !value.trim().is_empty())
1273 })
1274 .unwrap_or_else(|| {
1275 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1276 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1277 crate::llm_config::default_tool_format(&model, &provider)
1278 });
1279 let mut llm_result = if node.kind == "verify" {
1280 if let Some(command) = node
1281 .verify
1282 .as_ref()
1283 .and_then(|verify| verify.as_object())
1284 .and_then(|verify| verify.get("command"))
1285 .and_then(|value| value.as_str())
1286 .map(str::trim)
1287 .filter(|value| !value.is_empty())
1288 {
1289 let mut process = if cfg!(target_os = "windows") {
1290 let mut cmd = tokio::process::Command::new("cmd");
1291 cmd.arg("/C").arg(command);
1292 cmd
1293 } else {
1294 let mut cmd = tokio::process::Command::new("/bin/sh");
1295 cmd.arg("-lc").arg(command);
1296 cmd
1297 };
1298 process.stdin(std::process::Stdio::null());
1299 if let Some(context) = crate::stdlib::process::current_execution_context() {
1300 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1301 process.current_dir(cwd);
1302 }
1303 if !context.env.is_empty() {
1304 process.envs(context.env);
1305 }
1306 }
1307 let output = process
1308 .output()
1309 .await
1310 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
1311 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1312 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1313 let combined = if stderr.is_empty() {
1314 stdout.clone()
1315 } else if stdout.is_empty() {
1316 stderr.clone()
1317 } else {
1318 format!("{stdout}\n{stderr}")
1319 };
1320 serde_json::json!({
1321 "status": "completed",
1322 "text": combined,
1323 "visible_text": combined,
1324 "command": command,
1325 "stdout": stdout,
1326 "stderr": stderr,
1327 "exit_status": output.status.code().unwrap_or(-1),
1328 "success": output.status.success(),
1329 })
1330 } else {
1331 serde_json::json!({
1332 "status": "completed",
1333 "text": "",
1334 "visible_text": "",
1335 })
1336 }
1337 } else {
1338 let mut options = BTreeMap::new();
1339 if let Some(provider) = &node.model_policy.provider {
1340 options.insert(
1341 "provider".to_string(),
1342 VmValue::String(Rc::from(provider.clone())),
1343 );
1344 }
1345 if let Some(model) = &node.model_policy.model {
1346 options.insert(
1347 "model".to_string(),
1348 VmValue::String(Rc::from(model.clone())),
1349 );
1350 }
1351 if let Some(model_tier) = &node.model_policy.model_tier {
1352 options.insert(
1353 "model_tier".to_string(),
1354 VmValue::String(Rc::from(model_tier.clone())),
1355 );
1356 }
1357 if let Some(temperature) = node.model_policy.temperature {
1358 options.insert("temperature".to_string(), VmValue::Float(temperature));
1359 }
1360 if let Some(max_tokens) = node.model_policy.max_tokens {
1361 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1362 }
1363 let tool_names = workflow_tool_names(&node.tools);
1364 let tools_value = node.raw_tools.clone().or_else(|| {
1365 if matches!(node.tools, serde_json::Value::Null) {
1366 None
1367 } else {
1368 Some(crate::stdlib::json_to_vm_value(&node.tools))
1369 }
1370 });
1371 if tools_value.is_some() && !tool_names.is_empty() {
1372 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1373 }
1374 options.insert(
1375 "session_id".to_string(),
1376 VmValue::String(Rc::from(stage_session_id.clone())),
1377 );
1378
1379 let args = vec![
1380 VmValue::String(Rc::from(prompt.clone())),
1381 node.system
1382 .clone()
1383 .map(|s| VmValue::String(Rc::from(s)))
1384 .unwrap_or(VmValue::Nil),
1385 VmValue::Dict(Rc::new(options)),
1386 ];
1387 let mut opts = extract_llm_options(&args)?;
1388 let auto_compact = resolve_stage_auto_compact(node, &opts).await?;
1389
1390 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1391 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1392 let effective_policy = tool_policy
1393 .intersect(&node.capability_policy)
1394 .map_err(VmError::Runtime)?;
1395 crate::llm::run_agent_loop_internal(
1396 &mut opts,
1397 crate::llm::AgentLoopConfig {
1398 persistent: true,
1399 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1400 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1401 nudge: node.model_policy.nudge.clone(),
1402 done_sentinel: node.done_sentinel.clone(),
1403 break_unless_phase: None,
1404 tool_retries: 0,
1405 tool_backoff_ms: 1000,
1406 tool_format: tool_format.clone(),
1407 native_tool_fallback: node.model_policy.native_tool_fallback,
1408 auto_compact,
1409 policy: Some(effective_policy),
1410 approval_policy: Some(node.approval_policy.clone()),
1411 daemon: false,
1412 daemon_config: Default::default(),
1413 llm_retries: 2,
1414 llm_backoff_ms: 2000,
1415 token_budget: None,
1416 exit_when_verified: node.exit_when_verified,
1417 loop_detect_warn: 2,
1418 loop_detect_block: 3,
1419 loop_detect_skip: 4,
1420 tool_examples: node.model_policy.tool_examples.clone(),
1421 turn_policy: node.model_policy.turn_policy.clone(),
1422 stop_after_successful_tools: node
1423 .model_policy
1424 .stop_after_successful_tools
1425 .clone(),
1426 require_successful_tools: node.model_policy.require_successful_tools.clone(),
1427 session_id: stage_session_id.clone(),
1431 event_sink: None,
1432 task_ledger: node
1436 .raw_model_policy
1437 .as_ref()
1438 .and_then(|v| v.as_dict())
1439 .and_then(|d| d.get("task_ledger"))
1440 .map(crate::llm::helpers::vm_value_to_json)
1441 .and_then(|json| serde_json::from_value(json).ok())
1442 .unwrap_or_default(),
1443 post_turn_callback: node
1444 .raw_model_policy
1445 .as_ref()
1446 .and_then(|v| v.as_dict())
1447 .and_then(|d| d.get("post_turn_callback"))
1448 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1449 .cloned(),
1450 skill_registry: resolve_stage_skill_registry(node),
1457 skill_match: resolve_stage_skill_match(node),
1458 working_files: Vec::new(),
1459 },
1460 )
1461 .await?
1462 } else {
1463 let result = vm_call_llm_full(&opts).await?;
1464 crate::llm::agent_loop_result_from_llm(&result, opts)
1465 }
1466 };
1467 if let Some(payload) = llm_result.as_object_mut() {
1468 payload.insert("prompt".to_string(), serde_json::json!(prompt));
1469 payload.insert(
1470 "system_prompt".to_string(),
1471 serde_json::json!(node.system.clone().unwrap_or_default()),
1472 );
1473 payload.insert(
1474 "rendered_context".to_string(),
1475 serde_json::json!(rendered_context),
1476 );
1477 if !verification_contracts.is_empty() {
1478 payload.insert(
1479 "verification_contracts".to_string(),
1480 serde_json::to_value(&verification_contracts).unwrap_or_default(),
1481 );
1482 payload.insert(
1483 "rendered_verification_context".to_string(),
1484 serde_json::json!(rendered_verification),
1485 );
1486 }
1487 payload.insert(
1488 "selected_artifact_ids".to_string(),
1489 serde_json::json!(selected
1490 .iter()
1491 .map(|artifact| artifact.id.clone())
1492 .collect::<Vec<_>>()),
1493 );
1494 payload.insert(
1495 "selected_artifact_titles".to_string(),
1496 serde_json::json!(selected
1497 .iter()
1498 .map(|artifact| artifact.title.clone())
1499 .collect::<Vec<_>>()),
1500 );
1501 payload.insert(
1502 "tool_calling_mode".to_string(),
1503 serde_json::json!(tool_format.clone()),
1504 );
1505 }
1506
1507 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1508 let result_transcript = llm_result
1512 .get("transcript")
1513 .cloned()
1514 .map(|value| crate::stdlib::json_to_vm_value(&value));
1515 let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1516 let transcript = result_transcript
1517 .or(session_transcript)
1518 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1519 let output_kind = node
1520 .output_contract
1521 .output_kinds
1522 .first()
1523 .cloned()
1524 .unwrap_or_else(|| {
1525 if node.kind == "verify" {
1526 "verification_result".to_string()
1527 } else {
1528 "artifact".to_string()
1529 }
1530 });
1531 let mut metadata = BTreeMap::new();
1532 metadata.insert(
1533 "input_artifact_ids".to_string(),
1534 serde_json::json!(selected
1535 .iter()
1536 .map(|artifact| artifact.id.clone())
1537 .collect::<Vec<_>>()),
1538 );
1539 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1540 if !node.approval_policy.write_path_allowlist.is_empty() {
1541 metadata.insert(
1542 "changed_paths".to_string(),
1543 serde_json::json!(node.approval_policy.write_path_allowlist),
1544 );
1545 }
1546 let artifact = ArtifactRecord {
1547 type_name: "artifact".to_string(),
1548 id: new_id("artifact"),
1549 kind: output_kind,
1550 title: Some(format!("stage {node_id} output")),
1551 text: Some(visible_text),
1552 data: Some(llm_result.clone()),
1553 source: Some(node_id.to_string()),
1554 created_at: now_rfc3339(),
1555 freshness: Some("fresh".to_string()),
1556 priority: None,
1557 lineage: selected
1558 .iter()
1559 .map(|artifact| artifact.id.clone())
1560 .collect(),
1561 relevance: Some(1.0),
1562 estimated_tokens: None,
1563 stage: Some(node_id.to_string()),
1564 metadata,
1565 }
1566 .normalize();
1567
1568 Ok((llm_result, vec![artifact], transcript))
1569}
1570
1571pub fn next_nodes_for(
1572 graph: &WorkflowGraph,
1573 current: &str,
1574 branch: Option<&str>,
1575) -> Vec<WorkflowEdge> {
1576 let mut matching: Vec<WorkflowEdge> = graph
1577 .edges
1578 .iter()
1579 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1580 .cloned()
1581 .collect();
1582 if matching.is_empty() {
1583 matching = graph
1584 .edges
1585 .iter()
1586 .filter(|edge| edge.from == current && edge.branch.is_none())
1587 .cloned()
1588 .collect();
1589 }
1590 matching
1591}
1592
1593pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1594 next_nodes_for(graph, current, Some(branch))
1595 .into_iter()
1596 .next()
1597 .map(|edge| edge.to)
1598}
1599
1600pub fn append_audit_entry(
1601 graph: &mut WorkflowGraph,
1602 op: &str,
1603 node_id: Option<String>,
1604 reason: Option<String>,
1605 metadata: BTreeMap<String, serde_json::Value>,
1606) {
1607 graph.audit_log.push(WorkflowAuditEntry {
1608 id: new_id("audit"),
1609 op: op.to_string(),
1610 node_id,
1611 timestamp: now_rfc3339(),
1612 reason,
1613 metadata,
1614 });
1615}