1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23 pub id: Option<String>,
24 pub kind: String,
25 pub mode: Option<String>,
26 pub prompt: Option<String>,
27 pub system: Option<String>,
28 pub task_label: Option<String>,
29 pub done_sentinel: Option<String>,
30 pub tools: serde_json::Value,
31 pub model_policy: ModelPolicy,
32 pub auto_compact: AutoCompactPolicy,
37 #[serde(default)]
42 pub output_visibility: Option<String>,
43 pub context_policy: ContextPolicy,
44 pub retry_policy: RetryPolicy,
45 pub capability_policy: CapabilityPolicy,
46 pub approval_policy: super::ToolApprovalPolicy,
47 pub input_contract: StageContract,
48 pub output_contract: StageContract,
49 pub branch_semantics: BranchSemantics,
50 pub map_policy: MapPolicy,
51 pub join_policy: JoinPolicy,
52 pub reduce_policy: ReducePolicy,
53 pub escalation_policy: EscalationPolicy,
54 pub verify: Option<serde_json::Value>,
55 #[serde(default)]
60 pub exit_when_verified: bool,
61 pub metadata: BTreeMap<String, serde_json::Value>,
62 #[serde(skip)]
63 pub raw_tools: Option<VmValue>,
64 #[serde(skip)]
68 pub raw_auto_compact: Option<VmValue>,
69 #[serde(skip)]
72 pub raw_model_policy: Option<VmValue>,
73 #[serde(skip)]
78 pub raw_context_assembler: Option<VmValue>,
79}
80
81impl PartialEq for WorkflowNode {
82 fn eq(&self, other: &Self) -> bool {
83 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
84 }
85}
86
87#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(default)]
89pub struct VerificationRequirement {
90 pub kind: String,
91 pub value: String,
92 pub note: Option<String>,
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationContract {
98 pub source_node: Option<String>,
99 pub summary: Option<String>,
100 pub command: Option<String>,
101 pub expect_status: Option<i64>,
102 pub assert_text: Option<String>,
103 pub expect_text: Option<String>,
104 pub required_identifiers: Vec<String>,
105 pub required_paths: Vec<String>,
106 pub required_text: Vec<String>,
107 pub notes: Vec<String>,
108 pub checks: Vec<VerificationRequirement>,
109}
110
111impl VerificationContract {
112 fn is_empty(&self) -> bool {
113 self.summary.is_none()
114 && self.command.is_none()
115 && self.expect_status.is_none()
116 && self.assert_text.is_none()
117 && self.expect_text.is_none()
118 && self.required_identifiers.is_empty()
119 && self.required_paths.is_empty()
120 && self.required_text.is_empty()
121 && self.notes.is_empty()
122 && self.checks.is_empty()
123 }
124}
125
126fn push_unique_string(values: &mut Vec<String>, value: &str) {
127 let trimmed = value.trim();
128 if trimmed.is_empty() {
129 return;
130 }
131 if !values.iter().any(|existing| existing == trimmed) {
132 values.push(trimmed.to_string());
133 }
134}
135
136fn push_unique_requirement(
137 values: &mut Vec<VerificationRequirement>,
138 kind: &str,
139 value: &str,
140 note: Option<&str>,
141) {
142 let trimmed_kind = kind.trim();
143 let trimmed_value = value.trim();
144 let trimmed_note = note
145 .map(str::trim)
146 .filter(|candidate| !candidate.is_empty())
147 .map(|candidate| candidate.to_string());
148 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
149 return;
150 }
151 let candidate = VerificationRequirement {
152 kind: trimmed_kind.to_string(),
153 value: trimmed_value.to_string(),
154 note: trimmed_note,
155 };
156 if !values.iter().any(|existing| existing == &candidate) {
157 values.push(candidate);
158 }
159}
160
161fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
162 match value {
163 Some(serde_json::Value::String(text)) => {
164 let mut values = Vec::new();
165 push_unique_string(&mut values, text);
166 values
167 }
168 Some(serde_json::Value::Array(items)) => {
169 let mut values = Vec::new();
170 for item in items {
171 if let Some(text) = item.as_str() {
172 push_unique_string(&mut values, text);
173 }
174 }
175 values
176 }
177 _ => Vec::new(),
178 }
179}
180
181fn merge_verification_requirement_list(
182 target: &mut Vec<VerificationRequirement>,
183 value: Option<&serde_json::Value>,
184) {
185 let Some(items) = value.and_then(|raw| raw.as_array()) else {
186 return;
187 };
188 for item in items {
189 let Some(object) = item.as_object() else {
190 continue;
191 };
192 let kind = object
193 .get("kind")
194 .and_then(|value| value.as_str())
195 .unwrap_or_default();
196 let value = object
197 .get("value")
198 .and_then(|value| value.as_str())
199 .unwrap_or_default();
200 let note = object
201 .get("note")
202 .or_else(|| object.get("description"))
203 .or_else(|| object.get("reason"))
204 .and_then(|value| value.as_str());
205 push_unique_requirement(target, kind, value, note);
206 }
207}
208
209fn merge_verification_contract_fields(
210 target: &mut VerificationContract,
211 object: &serde_json::Map<String, serde_json::Value>,
212) {
213 if target.summary.is_none() {
214 target.summary = object
215 .get("summary")
216 .and_then(|value| value.as_str())
217 .map(str::trim)
218 .filter(|value| !value.is_empty())
219 .map(|value| value.to_string());
220 }
221 if target.command.is_none() {
222 target.command = object
223 .get("command")
224 .and_then(|value| value.as_str())
225 .map(str::trim)
226 .filter(|value| !value.is_empty())
227 .map(|value| value.to_string());
228 }
229 if target.expect_status.is_none() {
230 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
231 }
232 if target.assert_text.is_none() {
233 target.assert_text = object
234 .get("assert_text")
235 .and_then(|value| value.as_str())
236 .map(str::trim)
237 .filter(|value| !value.is_empty())
238 .map(|value| value.to_string());
239 }
240 if target.expect_text.is_none() {
241 target.expect_text = object
242 .get("expect_text")
243 .and_then(|value| value.as_str())
244 .map(str::trim)
245 .filter(|value| !value.is_empty())
246 .map(|value| value.to_string());
247 }
248
249 for value in json_string_list(
250 object
251 .get("required_identifiers")
252 .or_else(|| object.get("identifiers")),
253 ) {
254 push_unique_string(&mut target.required_identifiers, &value);
255 }
256 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
257 push_unique_string(&mut target.required_paths, &value);
258 }
259 for value in json_string_list(
260 object
261 .get("required_text")
262 .or_else(|| object.get("exact_text"))
263 .or_else(|| object.get("required_strings")),
264 ) {
265 push_unique_string(&mut target.required_text, &value);
266 }
267 for value in json_string_list(object.get("notes")) {
268 push_unique_string(&mut target.notes, &value);
269 }
270 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
271}
272
273fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
274 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
275 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
276 VmError::Runtime(format!(
277 "workflow verification contract read failed for {}: {error}",
278 resolved.display()
279 ))
280 })?;
281 serde_json::from_str(&contents).map_err(|error| {
282 VmError::Runtime(format!(
283 "workflow verification contract parse failed for {}: {error}",
284 resolved.display()
285 ))
286 })
287}
288
289fn resolve_verification_contract_path(
290 verify: &serde_json::Map<String, serde_json::Value>,
291) -> Result<Option<serde_json::Value>, VmError> {
292 let Some(path) = verify
293 .get("contract_path")
294 .or_else(|| verify.get("verification_contract_path"))
295 .and_then(|value| value.as_str())
296 .map(str::trim)
297 .filter(|value| !value.is_empty())
298 else {
299 return Ok(None);
300 };
301 Ok(Some(load_verification_contract_file(path)?))
302}
303
304pub fn verification_contract_from_verify(
305 node_id: &str,
306 verify: Option<&serde_json::Value>,
307) -> Result<Option<VerificationContract>, VmError> {
308 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
309 return Ok(None);
310 };
311
312 let mut contract = VerificationContract {
313 source_node: Some(node_id.to_string()),
314 ..Default::default()
315 };
316
317 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
318 let Some(object) = file_contract.as_object() else {
319 return Err(VmError::Runtime(
320 "workflow verification contract file must parse to a JSON object".to_string(),
321 ));
322 };
323 merge_verification_contract_fields(&mut contract, object);
324 }
325
326 if let Some(inline_contract) = verify_object.get("contract") {
327 let Some(object) = inline_contract.as_object() else {
328 return Err(VmError::Runtime(
329 "workflow verify.contract must be an object".to_string(),
330 ));
331 };
332 merge_verification_contract_fields(&mut contract, object);
333 }
334
335 merge_verification_contract_fields(&mut contract, verify_object);
336
337 if let Some(assert_text) = contract.assert_text.clone() {
338 push_unique_requirement(
339 &mut contract.checks,
340 "visible_text_contains",
341 &assert_text,
342 Some("verify stage requires visible output to contain this text"),
343 );
344 }
345 if let Some(expect_text) = contract.expect_text.clone() {
346 push_unique_requirement(
347 &mut contract.checks,
348 "combined_output_contains",
349 &expect_text,
350 Some("verify command requires combined stdout/stderr to contain this text"),
351 );
352 }
353 if let Some(expect_status) = contract.expect_status {
354 push_unique_requirement(
355 &mut contract.checks,
356 "expect_status",
357 &expect_status.to_string(),
358 Some("verify command exit status must match exactly"),
359 );
360 }
361 for identifier in contract.required_identifiers.clone() {
362 push_unique_requirement(
363 &mut contract.checks,
364 "identifier",
365 &identifier,
366 Some("use this exact identifier spelling"),
367 );
368 }
369 for path in contract.required_paths.clone() {
370 push_unique_requirement(
371 &mut contract.checks,
372 "path",
373 &path,
374 Some("preserve this exact path"),
375 );
376 }
377 for text in contract.required_text.clone() {
378 push_unique_requirement(
379 &mut contract.checks,
380 "text",
381 &text,
382 Some("required exact text or wiring snippet"),
383 );
384 }
385
386 if contract.is_empty() {
387 return Ok(None);
388 }
389 Ok(Some(contract))
390}
391
392fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
393 if !values.iter().any(|existing| existing == &candidate) {
394 values.push(candidate);
395 }
396}
397
398pub fn workflow_verification_contracts(
399 graph: &WorkflowGraph,
400) -> Result<Vec<VerificationContract>, VmError> {
401 let mut contracts = Vec::new();
402 for (node_id, node) in &graph.nodes {
403 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
404 push_unique_contract(&mut contracts, contract);
405 }
406 }
407 Ok(contracts)
408}
409
410pub fn inject_workflow_verification_contracts(
411 node: &mut WorkflowNode,
412 contracts: &[VerificationContract],
413) {
414 if contracts.is_empty() {
415 return;
416 }
417 node.metadata.insert(
418 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
419 serde_json::to_value(contracts).unwrap_or_default(),
420 );
421}
422
423pub fn stage_verification_contracts(
424 node_id: &str,
425 node: &WorkflowNode,
426) -> Result<Vec<VerificationContract>, VmError> {
427 let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
428 let local_only = matches!(
429 node.metadata
430 .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
431 .and_then(|value| value.as_str()),
432 Some("local_only")
433 );
434 if local_only {
435 return Ok(local_contract.into_iter().collect());
436 }
437
438 let mut contracts = node
439 .metadata
440 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
441 .cloned()
442 .map(|value| {
443 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
444 VmError::Runtime(format!(
445 "workflow stage {node_id} verification contract metadata parse failed: {error}"
446 ))
447 })
448 })
449 .transpose()?
450 .unwrap_or_default();
451
452 if let Some(local_contract) = local_contract {
453 push_unique_contract(&mut contracts, local_contract);
454 }
455 Ok(contracts)
456}
457
458pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
459 match value {
460 serde_json::Value::Null => Vec::new(),
461 serde_json::Value::Array(items) => items
462 .iter()
463 .filter_map(|item| match item {
464 serde_json::Value::Object(map) => map
465 .get("name")
466 .and_then(|value| value.as_str())
467 .filter(|name| !name.is_empty())
468 .map(|name| name.to_string()),
469 _ => None,
470 })
471 .collect(),
472 serde_json::Value::Object(map) => {
473 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
474 return map
475 .get("tools")
476 .map(workflow_tool_names)
477 .unwrap_or_default();
478 }
479 map.get("name")
480 .and_then(|value| value.as_str())
481 .filter(|name| !name.is_empty())
482 .map(|name| vec![name.to_string()])
483 .unwrap_or_default()
484 }
485 _ => Vec::new(),
486 }
487}
488
489fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
490 fn rank(v: &str) -> usize {
491 match v {
492 "none" => 0,
493 "read_only" => 1,
494 "workspace_write" => 2,
495 "process_exec" => 3,
496 "network" => 4,
497 _ => 5,
498 }
499 }
500 levels.max_by_key(|level| rank(level))
501}
502
503fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
504 match value.and_then(|v| v.as_str()).unwrap_or("") {
505 "read" => ToolKind::Read,
506 "edit" => ToolKind::Edit,
507 "delete" => ToolKind::Delete,
508 "move" => ToolKind::Move,
509 "search" => ToolKind::Search,
510 "execute" => ToolKind::Execute,
511 "think" => ToolKind::Think,
512 "fetch" => ToolKind::Fetch,
513 _ => ToolKind::Other,
514 }
515}
516
517fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
518 let policy = map
519 .get("policy")
520 .and_then(|value| value.as_object())
521 .cloned()
522 .unwrap_or_default();
523
524 let capabilities = policy
525 .get("capabilities")
526 .and_then(|value| value.as_object())
527 .map(|caps| {
528 caps.iter()
529 .map(|(capability, ops)| {
530 let values = ops
531 .as_array()
532 .map(|items| {
533 items
534 .iter()
535 .filter_map(|item| item.as_str().map(|s| s.to_string()))
536 .collect::<Vec<_>>()
537 })
538 .unwrap_or_default();
539 (capability.clone(), values)
540 })
541 .collect::<BTreeMap<_, _>>()
542 })
543 .unwrap_or_default();
544
545 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
548 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
549 } else {
550 ToolArgSchema {
551 path_params: policy
552 .get("path_params")
553 .and_then(|value| value.as_array())
554 .map(|items| {
555 items
556 .iter()
557 .filter_map(|item| item.as_str().map(|s| s.to_string()))
558 .collect::<Vec<_>>()
559 })
560 .unwrap_or_default(),
561 arg_aliases: policy
562 .get("arg_aliases")
563 .and_then(|value| value.as_object())
564 .map(|aliases| {
565 aliases
566 .iter()
567 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
568 .collect::<BTreeMap<_, _>>()
569 })
570 .unwrap_or_default(),
571 required: policy
572 .get("required")
573 .and_then(|value| value.as_array())
574 .map(|items| {
575 items
576 .iter()
577 .filter_map(|item| item.as_str().map(|s| s.to_string()))
578 .collect::<Vec<_>>()
579 })
580 .unwrap_or_default(),
581 }
582 };
583
584 let kind = parse_tool_kind(policy.get("kind"));
585 let side_effect_level = policy
586 .get("side_effect_level")
587 .and_then(|value| value.as_str())
588 .map(SideEffectLevel::parse)
589 .unwrap_or_default();
590
591 ToolAnnotations {
592 kind,
593 side_effect_level,
594 arg_schema,
595 capabilities,
596 emits_artifacts: policy
597 .get("emits_artifacts")
598 .and_then(|value| value.as_bool())
599 .unwrap_or(false),
600 result_readers: policy
601 .get("result_readers")
602 .or_else(|| policy.get("readable_result_routes"))
603 .and_then(|value| value.as_array())
604 .map(|items| {
605 items
606 .iter()
607 .filter_map(|item| item.as_str().map(|s| s.to_string()))
608 .collect::<Vec<_>>()
609 })
610 .unwrap_or_default(),
611 inline_result: policy
612 .get("inline_result")
613 .and_then(|value| value.as_bool())
614 .unwrap_or(false),
615 }
616}
617
618pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
619 match value {
620 serde_json::Value::Null => BTreeMap::new(),
621 serde_json::Value::Array(items) => items
622 .iter()
623 .filter_map(|item| match item {
624 serde_json::Value::Object(map) => map
625 .get("name")
626 .and_then(|value| value.as_str())
627 .filter(|name| !name.is_empty())
628 .map(|name| (name.to_string(), parse_tool_annotations(map))),
629 _ => None,
630 })
631 .collect(),
632 serde_json::Value::Object(map) => {
633 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
634 return map
635 .get("tools")
636 .map(workflow_tool_annotations)
637 .unwrap_or_default();
638 }
639 map.get("name")
640 .and_then(|value| value.as_str())
641 .filter(|name| !name.is_empty())
642 .map(|name| {
643 let mut annotations = BTreeMap::new();
644 annotations.insert(name.to_string(), parse_tool_annotations(map));
645 annotations
646 })
647 .unwrap_or_default()
648 }
649 _ => BTreeMap::new(),
650 }
651}
652
653pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
654 let tools = workflow_tool_names(value);
655 let tool_annotations = workflow_tool_annotations(value);
656 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
657 for annotations in tool_annotations.values() {
658 for (capability, ops) in &annotations.capabilities {
659 let entry = capabilities.entry(capability.clone()).or_default();
660 for op in ops {
661 if !entry.contains(op) {
662 entry.push(op.clone());
663 }
664 }
665 entry.sort();
666 }
667 }
668 let side_effect_level = max_side_effect_level(
669 tool_annotations
670 .values()
671 .map(|annotations| annotations.side_effect_level.as_str().to_string())
672 .filter(|level| level != "none"),
673 );
674 CapabilityPolicy {
675 tools,
676 capabilities,
677 workspace_roots: Vec::new(),
678 side_effect_level,
679 recursion_limit: None,
680 tool_arg_constraints: Vec::new(),
681 tool_annotations,
682 }
683}
684
685#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
686#[serde(default)]
687pub struct WorkflowEdge {
688 pub from: String,
689 pub to: String,
690 pub branch: Option<String>,
691 pub label: Option<String>,
692}
693
694#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
695#[serde(default)]
696pub struct WorkflowGraph {
697 #[serde(rename = "_type")]
698 pub type_name: String,
699 pub id: String,
700 pub name: Option<String>,
701 pub version: usize,
702 pub entry: String,
703 pub nodes: BTreeMap<String, WorkflowNode>,
704 pub edges: Vec<WorkflowEdge>,
705 pub capability_policy: CapabilityPolicy,
706 pub approval_policy: super::ToolApprovalPolicy,
707 pub metadata: BTreeMap<String, serde_json::Value>,
708 pub audit_log: Vec<WorkflowAuditEntry>,
709}
710
711#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
712#[serde(default)]
713pub struct WorkflowAuditEntry {
714 pub id: String,
715 pub op: String,
716 pub node_id: Option<String>,
717 pub timestamp: String,
718 pub reason: Option<String>,
719 pub metadata: BTreeMap<String, serde_json::Value>,
720}
721
722#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
723#[serde(default)]
724pub struct WorkflowValidationReport {
725 pub valid: bool,
726 pub errors: Vec<String>,
727 pub warnings: Vec<String>,
728 pub reachable_nodes: Vec<String>,
729}
730
731pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
732 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
733 let dict = value.as_dict();
734 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
735 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
736 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
737 node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
738 Ok(node)
739}
740
741pub fn parse_workflow_node_json(
742 json: serde_json::Value,
743 label: &str,
744) -> Result<WorkflowNode, VmError> {
745 super::parse_json_payload(json, label)
746}
747
748pub fn parse_workflow_edge_json(
749 json: serde_json::Value,
750 label: &str,
751) -> Result<WorkflowEdge, VmError> {
752 super::parse_json_payload(json, label)
753}
754
755pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
756 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
757 let as_dict = value.as_dict().cloned().unwrap_or_default();
758
759 if graph.nodes.is_empty() {
760 for key in ["act", "verify", "repair"] {
761 if let Some(node_value) = as_dict.get(key) {
762 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
763 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
764 node.id = Some(key.to_string());
765 if node.kind.is_empty() {
766 node.kind = if key == "verify" {
767 "verify".to_string()
768 } else {
769 "stage".to_string()
770 };
771 }
772 if node.model_policy.provider.is_none() {
773 node.model_policy.provider = as_dict
774 .get("provider")
775 .map(|value| value.display())
776 .filter(|value| !value.is_empty());
777 }
778 if node.model_policy.model.is_none() {
779 node.model_policy.model = as_dict
780 .get("model")
781 .map(|value| value.display())
782 .filter(|value| !value.is_empty());
783 }
784 if node.model_policy.model_tier.is_none() {
785 node.model_policy.model_tier = as_dict
786 .get("model_tier")
787 .or_else(|| as_dict.get("tier"))
788 .map(|value| value.display())
789 .filter(|value| !value.is_empty());
790 }
791 if node.model_policy.temperature.is_none() {
792 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
793 if let VmValue::Float(number) = value {
794 Some(*number)
795 } else {
796 value.as_int().map(|number| number as f64)
797 }
798 });
799 }
800 if node.model_policy.max_tokens.is_none() {
801 node.model_policy.max_tokens =
802 as_dict.get("max_tokens").and_then(|value| value.as_int());
803 }
804 if node.mode.is_none() {
805 node.mode = as_dict
806 .get("mode")
807 .map(|value| value.display())
808 .filter(|value| !value.is_empty());
809 }
810 if node.done_sentinel.is_none() {
811 node.done_sentinel = as_dict
812 .get("done_sentinel")
813 .map(|value| value.display())
814 .filter(|value| !value.is_empty());
815 }
816 if key == "verify"
817 && node.verify.is_none()
818 && (raw_node.contains_key("assert_text")
819 || raw_node.contains_key("command")
820 || raw_node.contains_key("expect_status")
821 || raw_node.contains_key("expect_text"))
822 {
823 node.verify = Some(serde_json::json!({
824 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
825 "command": raw_node.get("command").map(vm_value_to_json),
826 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
827 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
828 }));
829 }
830 graph.nodes.insert(key.to_string(), node);
831 }
832 }
833 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
834 graph.entry = "act".to_string();
835 }
836 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
837 if graph.nodes.contains_key("verify") {
838 graph.edges.push(WorkflowEdge {
839 from: "act".to_string(),
840 to: "verify".to_string(),
841 branch: None,
842 label: None,
843 });
844 }
845 if graph.nodes.contains_key("repair") {
846 graph.edges.push(WorkflowEdge {
847 from: "verify".to_string(),
848 to: "repair".to_string(),
849 branch: Some("failed".to_string()),
850 label: None,
851 });
852 graph.edges.push(WorkflowEdge {
853 from: "repair".to_string(),
854 to: "verify".to_string(),
855 branch: Some("retry".to_string()),
856 label: None,
857 });
858 }
859 }
860 }
861
862 if graph.type_name.is_empty() {
863 graph.type_name = "workflow_graph".to_string();
864 }
865 if graph.id.is_empty() {
866 graph.id = new_id("workflow");
867 }
868 if graph.version == 0 {
869 graph.version = 1;
870 }
871 if graph.entry.is_empty() {
872 graph.entry = graph
873 .nodes
874 .keys()
875 .next()
876 .cloned()
877 .unwrap_or_else(|| "act".to_string());
878 }
879 for (node_id, node) in &mut graph.nodes {
880 if node.raw_tools.is_none() {
881 node.raw_tools = as_dict
882 .get("nodes")
883 .and_then(|nodes| nodes.as_dict())
884 .and_then(|nodes| nodes.get(node_id))
885 .and_then(|node_value| node_value.as_dict())
886 .and_then(|raw_node| raw_node.get("tools"))
887 .cloned();
888 }
889 if node.id.is_none() {
890 node.id = Some(node_id.clone());
891 }
892 if node.kind.is_empty() {
893 node.kind = "stage".to_string();
894 }
895 if node.join_policy.strategy.is_empty() {
896 node.join_policy.strategy = "all".to_string();
897 }
898 if node.reduce_policy.strategy.is_empty() {
899 node.reduce_policy.strategy = "concat".to_string();
900 }
901 if node.output_contract.output_kinds.is_empty() {
902 node.output_contract.output_kinds = vec![match node.kind.as_str() {
903 "verify" => "verification_result".to_string(),
904 "reduce" => node
905 .reduce_policy
906 .output_kind
907 .clone()
908 .unwrap_or_else(|| "summary".to_string()),
909 "map" => node
910 .map_policy
911 .output_kind
912 .clone()
913 .unwrap_or_else(|| "artifact".to_string()),
914 "escalation" => "plan".to_string(),
915 _ => "artifact".to_string(),
916 }];
917 }
918 if node.retry_policy.max_attempts == 0 {
919 node.retry_policy.max_attempts = 1;
920 }
921 }
922 Ok(graph)
923}
924
925pub fn validate_workflow(
926 graph: &WorkflowGraph,
927 ceiling: Option<&CapabilityPolicy>,
928) -> WorkflowValidationReport {
929 let mut errors = Vec::new();
930 let mut warnings = Vec::new();
931
932 if !graph.nodes.contains_key(&graph.entry) {
933 errors.push(format!("entry node does not exist: {}", graph.entry));
934 }
935
936 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
937 for edge in &graph.edges {
938 if !node_ids.contains(&edge.from) {
939 errors.push(format!("edge.from references unknown node: {}", edge.from));
940 }
941 if !node_ids.contains(&edge.to) {
942 errors.push(format!("edge.to references unknown node: {}", edge.to));
943 }
944 }
945
946 let reachable_nodes = reachable_nodes(graph);
947 for node_id in &node_ids {
948 if !reachable_nodes.contains(node_id) {
949 warnings.push(format!("node is unreachable: {node_id}"));
950 }
951 }
952
953 for (node_id, node) in &graph.nodes {
954 let incoming = graph
955 .edges
956 .iter()
957 .filter(|edge| edge.to == *node_id)
958 .count();
959 let outgoing: Vec<&WorkflowEdge> = graph
960 .edges
961 .iter()
962 .filter(|edge| edge.from == *node_id)
963 .collect();
964 if let Some(min_inputs) = node.input_contract.min_inputs {
965 if let Some(max_inputs) = node.input_contract.max_inputs {
966 if min_inputs > max_inputs {
967 errors.push(format!(
968 "node {node_id}: input contract min_inputs exceeds max_inputs"
969 ));
970 }
971 }
972 }
973 match node.kind.as_str() {
974 "condition" => {
975 let has_true = outgoing
976 .iter()
977 .any(|edge| edge.branch.as_deref() == Some("true"));
978 let has_false = outgoing
979 .iter()
980 .any(|edge| edge.branch.as_deref() == Some("false"));
981 if !has_true || !has_false {
982 errors.push(format!(
983 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
984 ));
985 }
986 }
987 "fork" if outgoing.len() < 2 => {
988 errors.push(format!(
989 "node {node_id}: fork nodes require at least two outgoing edges"
990 ));
991 }
992 "join" if incoming < 2 => {
993 warnings.push(format!(
994 "node {node_id}: join node has fewer than two incoming edges"
995 ));
996 }
997 "map"
998 if node.map_policy.items.is_empty()
999 && node.map_policy.item_artifact_kind.is_none()
1000 && node.input_contract.input_kinds.is_empty() =>
1001 {
1002 errors.push(format!(
1003 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
1004 ));
1005 }
1006 "reduce" if node.input_contract.input_kinds.is_empty() => {
1007 warnings.push(format!(
1008 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
1009 ));
1010 }
1011 _ => {}
1012 }
1013 }
1014
1015 if let Some(ceiling) = ceiling {
1016 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
1017 errors.push(error);
1018 }
1019 for (node_id, node) in &graph.nodes {
1020 if let Err(error) = ceiling.intersect(&node.capability_policy) {
1021 errors.push(format!("node {node_id}: {error}"));
1022 }
1023 }
1024 }
1025
1026 for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
1027 let message = format!("{}: {}", diagnostic.code, diagnostic.message);
1028 match diagnostic.severity {
1029 crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
1030 crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
1031 }
1032 }
1033
1034 WorkflowValidationReport {
1035 valid: errors.is_empty(),
1036 errors,
1037 warnings,
1038 reachable_nodes: reachable_nodes.into_iter().collect(),
1039 }
1040}
1041
1042fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1043 let mut seen = BTreeSet::new();
1044 let mut stack = vec![graph.entry.clone()];
1045 while let Some(node_id) = stack.pop() {
1046 if !seen.insert(node_id.clone()) {
1047 continue;
1048 }
1049 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1050 stack.push(edge.to.clone());
1051 }
1052 }
1053 seen
1054}
1055
1056fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1068 let per_node = node
1069 .raw_model_policy
1070 .as_ref()
1071 .and_then(|v| v.as_dict())
1072 .and_then(|d| d.get("skills"))
1073 .cloned()
1074 .and_then(normalize_inline_registry);
1075 if per_node.is_some() {
1076 return per_node;
1077 }
1078 super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1079}
1080
1081fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1085 let per_node = node
1086 .raw_model_policy
1087 .as_ref()
1088 .and_then(|v| v.as_dict())
1089 .and_then(|d| d.get("skill_match"))
1090 .and_then(|v| v.as_dict().cloned());
1091 if let Some(dict) = per_node {
1092 return crate::llm::parse_skill_match_config_dict(&dict);
1093 }
1094 super::current_workflow_skill_context()
1095 .and_then(|ctx| ctx.match_config)
1096 .and_then(|v| v.as_dict().cloned())
1097 .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1098 .unwrap_or_default()
1099}
1100
1101fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1106 use std::collections::BTreeMap;
1107 use std::rc::Rc;
1108 match &value {
1109 VmValue::Dict(d)
1110 if d.get("_type")
1111 .map(|v| v.display() == "skill_registry")
1112 .unwrap_or(false) =>
1113 {
1114 Some(value)
1115 }
1116 VmValue::List(list) => {
1117 let mut dict = BTreeMap::new();
1118 dict.insert(
1119 "_type".to_string(),
1120 VmValue::String(Rc::from("skill_registry")),
1121 );
1122 dict.insert("skills".to_string(), VmValue::List(list.clone()));
1123 Some(VmValue::Dict(Rc::new(dict)))
1124 }
1125 _ => None,
1126 }
1127}
1128
1129fn resolve_node_session_id(node: &WorkflowNode) -> String {
1130 if let Some(explicit) = node
1131 .raw_model_policy
1132 .as_ref()
1133 .and_then(|v| v.as_dict())
1134 .and_then(|d| d.get("session_id"))
1135 .and_then(|v| match v {
1136 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1137 _ => None,
1138 })
1139 {
1140 return explicit;
1141 }
1142 if let Some(persisted) = node
1143 .metadata
1144 .get("worker_session_id")
1145 .and_then(|value| value.as_str())
1146 .filter(|value| !value.trim().is_empty())
1147 {
1148 return persisted.to_string();
1149 }
1150 format!("workflow_stage_{}", uuid::Uuid::now_v7())
1151}
1152
1153fn raw_auto_compact_dict(
1154 node: &WorkflowNode,
1155) -> Option<&std::collections::BTreeMap<String, VmValue>> {
1156 node.raw_auto_compact
1157 .as_ref()
1158 .and_then(|value| value.as_dict())
1159}
1160
1161fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
1162 raw_auto_compact_dict(node)
1163 .and_then(|dict| dict.get(key))
1164 .and_then(|value| value.as_int())
1165 .filter(|value| *value >= 0)
1166 .map(|value| value as usize)
1167}
1168
1169fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
1170 raw_auto_compact_dict(node)
1171 .and_then(|dict| dict.get(key))
1172 .and_then(|value| match value {
1173 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1174 _ => None,
1175 })
1176}
1177
1178pub(crate) async fn resolve_stage_auto_compact(
1179 node: &WorkflowNode,
1180 opts: &crate::llm::api::LlmCallOptions,
1181) -> Result<Option<crate::orchestration::AutoCompactConfig>, VmError> {
1182 if !node.auto_compact.enabled {
1183 return Ok(None);
1184 }
1185
1186 let mut ac = crate::orchestration::AutoCompactConfig::default();
1187 if let Some(v) = node.auto_compact.token_threshold {
1188 ac.token_threshold = v;
1189 }
1190 if let Some(v) = node.auto_compact.tool_output_max_chars {
1191 ac.tool_output_max_chars = v;
1192 }
1193 if let Some(ref strategy) = node.auto_compact.compact_strategy {
1194 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1195 ac.compact_strategy = s;
1196 }
1197 }
1198 if let Some(v) = node.auto_compact.hard_limit_tokens {
1199 ac.hard_limit_tokens = Some(v);
1200 }
1201 if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1202 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1203 ac.hard_limit_strategy = s;
1204 }
1205 }
1206
1207 if let Some(v) = raw_auto_compact_int(node, "compact_keep_last")
1211 .or_else(|| raw_auto_compact_int(node, "keep_last"))
1212 {
1213 ac.keep_last = v;
1214 }
1215 if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1216 ac.summarize_prompt = Some(prompt);
1217 }
1218
1219 if let Some(dict) = raw_auto_compact_dict(node) {
1222 if let Some(cb) = dict.get("compress_callback") {
1223 ac.compress_callback = Some(cb.clone());
1224 }
1225 if let Some(cb) = dict.get("mask_callback") {
1226 ac.mask_callback = Some(cb.clone());
1227 }
1228 if let Some(cb) = dict.get("custom_compactor") {
1229 ac.custom_compactor = Some(cb.clone());
1230 }
1231 }
1232
1233 let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1234 let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1235 crate::llm::api::adapt_auto_compact_to_provider(
1236 &mut ac,
1237 user_specified_threshold,
1238 user_specified_hard_limit,
1239 &opts.provider,
1240 &opts.model,
1241 &opts.api_key,
1242 )
1243 .await;
1244
1245 Ok(Some(ac))
1246}
1247
1248pub async fn execute_stage_node(
1249 node_id: &str,
1250 node: &WorkflowNode,
1251 task: &str,
1252 artifacts: &[ArtifactRecord],
1253) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1254 let mut selection_policy = node.context_policy.clone();
1255 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1256 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1257 }
1258 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1259 let rendered_context = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1260 let assembled =
1261 crate::stdlib::assemble::assemble_from_options(&selected, assembler).await?;
1262 super::render_assembled_chunks(&assembled)
1263 } else {
1264 super::render_artifacts_context(&selected, &node.context_policy)
1265 };
1266 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1267 let rendered_verification = super::render_verification_context(&verification_contracts);
1268 let stage_session_id = resolve_node_session_id(node);
1269 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1270 return Err(VmError::Runtime(format!(
1271 "workflow stage {node_id} requires an existing session \
1272 (call agent_session_open and feed session_id through model_policy \
1273 before entering this stage)"
1274 )));
1275 }
1276 if let Some(min_inputs) = node.input_contract.min_inputs {
1277 if selected.len() < min_inputs {
1278 return Err(VmError::Runtime(format!(
1279 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1280 )));
1281 }
1282 }
1283 if let Some(max_inputs) = node.input_contract.max_inputs {
1284 if selected.len() > max_inputs {
1285 return Err(VmError::Runtime(format!(
1286 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1287 )));
1288 }
1289 }
1290 let prompt = super::render_workflow_prompt(
1291 task,
1292 node.task_label.as_deref(),
1293 &rendered_verification,
1294 &rendered_context,
1295 );
1296
1297 let tool_format = node
1305 .model_policy
1306 .tool_format
1307 .clone()
1308 .filter(|value| !value.trim().is_empty())
1309 .or_else(|| {
1310 std::env::var("HARN_AGENT_TOOL_FORMAT")
1311 .ok()
1312 .filter(|value| !value.trim().is_empty())
1313 })
1314 .unwrap_or_else(|| {
1315 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1316 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1317 crate::llm_config::default_tool_format(&model, &provider)
1318 });
1319 let mut llm_result = if node.kind == "verify" {
1320 if let Some(command) = node
1321 .verify
1322 .as_ref()
1323 .and_then(|verify| verify.as_object())
1324 .and_then(|verify| verify.get("command"))
1325 .and_then(|value| value.as_str())
1326 .map(str::trim)
1327 .filter(|value| !value.is_empty())
1328 {
1329 let (program, args) = if cfg!(target_os = "windows") {
1330 ("cmd", vec!["/C".to_string(), command.to_string()])
1331 } else {
1332 ("/bin/sh", vec!["-lc".to_string(), command.to_string()])
1333 };
1334 let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1335 stdin_null: true,
1336 ..Default::default()
1337 };
1338 if let Some(context) = crate::stdlib::process::current_execution_context() {
1339 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1340 crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1341 process_config.cwd = Some(std::path::PathBuf::from(cwd));
1342 }
1343 if !context.env.is_empty() {
1344 process_config.env.extend(context.env);
1345 }
1346 }
1347 let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1348 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1349 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1350 let combined = if stderr.is_empty() {
1351 stdout.clone()
1352 } else if stdout.is_empty() {
1353 stderr.clone()
1354 } else {
1355 format!("{stdout}\n{stderr}")
1356 };
1357 serde_json::json!({
1358 "status": "completed",
1359 "text": combined,
1360 "visible_text": combined,
1361 "command": command,
1362 "stdout": stdout,
1363 "stderr": stderr,
1364 "exit_status": output.status.code().unwrap_or(-1),
1365 "success": output.status.success(),
1366 })
1367 } else {
1368 serde_json::json!({
1369 "status": "completed",
1370 "text": "",
1371 "visible_text": "",
1372 })
1373 }
1374 } else {
1375 let mut options = BTreeMap::new();
1376 if let Some(provider) = &node.model_policy.provider {
1377 options.insert(
1378 "provider".to_string(),
1379 VmValue::String(Rc::from(provider.clone())),
1380 );
1381 }
1382 if let Some(model) = &node.model_policy.model {
1383 options.insert(
1384 "model".to_string(),
1385 VmValue::String(Rc::from(model.clone())),
1386 );
1387 }
1388 if let Some(model_tier) = &node.model_policy.model_tier {
1389 options.insert(
1390 "model_tier".to_string(),
1391 VmValue::String(Rc::from(model_tier.clone())),
1392 );
1393 }
1394 if let Some(temperature) = node.model_policy.temperature {
1395 options.insert("temperature".to_string(), VmValue::Float(temperature));
1396 }
1397 if let Some(max_tokens) = node.model_policy.max_tokens {
1398 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1399 }
1400 let tool_names = workflow_tool_names(&node.tools);
1401 let tools_value = node.raw_tools.clone().or_else(|| {
1402 if matches!(node.tools, serde_json::Value::Null) {
1403 None
1404 } else {
1405 Some(crate::stdlib::json_to_vm_value(&node.tools))
1406 }
1407 });
1408 if tools_value.is_some() && !tool_names.is_empty() {
1409 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1410 }
1411 options.insert(
1412 "session_id".to_string(),
1413 VmValue::String(Rc::from(stage_session_id.clone())),
1414 );
1415
1416 let args = vec![
1417 VmValue::String(Rc::from(prompt.clone())),
1418 node.system
1419 .clone()
1420 .map(|s| VmValue::String(Rc::from(s)))
1421 .unwrap_or(VmValue::Nil),
1422 VmValue::Dict(Rc::new(options)),
1423 ];
1424 let mut opts = extract_llm_options(&args)?;
1425 let auto_compact = resolve_stage_auto_compact(node, &opts).await?;
1426 let budget = opts.budget.clone();
1427
1428 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1429 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1430 let effective_policy = tool_policy
1431 .intersect(&node.capability_policy)
1432 .map_err(VmError::Runtime)?;
1433 let permissions = crate::llm::permissions::parse_dynamic_permission_policy(
1434 node.raw_model_policy
1435 .as_ref()
1436 .and_then(|value| value.as_dict())
1437 .and_then(|dict| dict.get("permissions")),
1438 "workflow model_policy",
1439 )?;
1440 crate::llm::run_agent_loop_internal(
1441 &mut opts,
1442 crate::llm::AgentLoopConfig {
1443 persistent: true,
1444 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1445 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1446 nudge: node.model_policy.nudge.clone(),
1447 done_sentinel: node.done_sentinel.clone(),
1448 break_unless_phase: None,
1449 tool_retries: 0,
1450 tool_backoff_ms: 1000,
1451 schema_retries: 0,
1452 schema_retry_nudge: crate::llm::parse_schema_nudge(
1453 &node
1454 .raw_model_policy
1455 .as_ref()
1456 .and_then(|value| value.as_dict())
1457 .cloned(),
1458 ),
1459 tool_format: tool_format.clone(),
1460 native_tool_fallback: node.model_policy.native_tool_fallback,
1461 auto_compact,
1462 policy: Some(effective_policy),
1463 command_policy: crate::orchestration::parse_command_policy_value(
1464 node.raw_model_policy
1465 .as_ref()
1466 .and_then(|value| value.as_dict())
1467 .and_then(|dict| dict.get("command_policy"))
1468 .or_else(|| {
1469 node.raw_model_policy
1470 .as_ref()
1471 .and_then(|value| value.as_dict())
1472 .and_then(|dict| dict.get("policy"))
1473 .and_then(|value| value.as_dict())
1474 .and_then(|policy| policy.get("command_policy"))
1475 }),
1476 "workflow model_policy",
1477 )?,
1478 permissions,
1479 approval_policy: Some(node.approval_policy.clone()),
1480 daemon: false,
1481 daemon_config: Default::default(),
1482 llm_retries: 2,
1483 llm_backoff_ms: 2000,
1484 token_budget: None,
1485 budget,
1486 exit_when_verified: node.exit_when_verified,
1487 loop_detect_warn: 2,
1488 loop_detect_block: 3,
1489 loop_detect_skip: 4,
1490 tool_examples: node.model_policy.tool_examples.clone(),
1491 turn_policy: node.model_policy.turn_policy.clone(),
1492 stop_after_successful_tools: node
1493 .model_policy
1494 .stop_after_successful_tools
1495 .clone(),
1496 require_successful_tools: node.model_policy.require_successful_tools.clone(),
1497 session_id: stage_session_id.clone(),
1501 event_sink: None,
1502 task_ledger: node
1506 .raw_model_policy
1507 .as_ref()
1508 .and_then(|v| v.as_dict())
1509 .and_then(|d| d.get("task_ledger"))
1510 .map(crate::llm::helpers::vm_value_to_json)
1511 .and_then(|json| serde_json::from_value(json).ok())
1512 .unwrap_or_default(),
1513 post_turn_callback: node
1514 .raw_model_policy
1515 .as_ref()
1516 .and_then(|v| v.as_dict())
1517 .and_then(|d| d.get("post_turn_callback"))
1518 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1519 .cloned(),
1520 skill_registry: resolve_stage_skill_registry(node),
1527 skill_match: resolve_stage_skill_match(node),
1528 working_files: Vec::new(),
1529 mcp_servers: Vec::new(),
1530 mcp_clients: Default::default(),
1531 },
1532 )
1533 .await?
1534 } else {
1535 let result = vm_call_llm_full(&opts).await?;
1536 crate::llm::agent_loop_result_from_llm(&result, opts)
1537 }
1538 };
1539 if let Some(payload) = llm_result.as_object_mut() {
1540 payload.insert("prompt".to_string(), serde_json::json!(prompt));
1541 payload.insert(
1542 "system_prompt".to_string(),
1543 serde_json::json!(node.system.clone().unwrap_or_default()),
1544 );
1545 payload.insert(
1546 "rendered_context".to_string(),
1547 serde_json::json!(rendered_context),
1548 );
1549 if !verification_contracts.is_empty() {
1550 payload.insert(
1551 "verification_contracts".to_string(),
1552 serde_json::to_value(&verification_contracts).unwrap_or_default(),
1553 );
1554 payload.insert(
1555 "rendered_verification_context".to_string(),
1556 serde_json::json!(rendered_verification),
1557 );
1558 }
1559 payload.insert(
1560 "selected_artifact_ids".to_string(),
1561 serde_json::json!(selected
1562 .iter()
1563 .map(|artifact| artifact.id.clone())
1564 .collect::<Vec<_>>()),
1565 );
1566 payload.insert(
1567 "selected_artifact_titles".to_string(),
1568 serde_json::json!(selected
1569 .iter()
1570 .map(|artifact| artifact.title.clone())
1571 .collect::<Vec<_>>()),
1572 );
1573 match payload
1574 .entry("tools".to_string())
1575 .or_insert_with(|| serde_json::json!({}))
1576 {
1577 serde_json::Value::Object(tools) => {
1578 tools.insert("mode".to_string(), serde_json::json!(tool_format.clone()));
1579 }
1580 slot => {
1581 *slot = serde_json::json!({ "mode": tool_format.clone() });
1582 }
1583 }
1584 }
1585
1586 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1587 let result_transcript = llm_result
1591 .get("transcript")
1592 .cloned()
1593 .map(|value| crate::stdlib::json_to_vm_value(&value));
1594 let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1595 let transcript = result_transcript
1596 .or(session_transcript)
1597 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1598 let output_kind = node
1599 .output_contract
1600 .output_kinds
1601 .first()
1602 .cloned()
1603 .unwrap_or_else(|| {
1604 if node.kind == "verify" {
1605 "verification_result".to_string()
1606 } else {
1607 "artifact".to_string()
1608 }
1609 });
1610 let mut metadata = BTreeMap::new();
1611 metadata.insert(
1612 "input_artifact_ids".to_string(),
1613 serde_json::json!(selected
1614 .iter()
1615 .map(|artifact| artifact.id.clone())
1616 .collect::<Vec<_>>()),
1617 );
1618 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1619 if !node.approval_policy.write_path_allowlist.is_empty() {
1620 metadata.insert(
1621 "changed_paths".to_string(),
1622 serde_json::json!(node.approval_policy.write_path_allowlist),
1623 );
1624 }
1625 let artifact = ArtifactRecord {
1626 type_name: "artifact".to_string(),
1627 id: new_id("artifact"),
1628 kind: output_kind,
1629 title: Some(format!("stage {node_id} output")),
1630 text: Some(visible_text),
1631 data: Some(llm_result.clone()),
1632 source: Some(node_id.to_string()),
1633 created_at: now_rfc3339(),
1634 freshness: Some("fresh".to_string()),
1635 priority: None,
1636 lineage: selected
1637 .iter()
1638 .map(|artifact| artifact.id.clone())
1639 .collect(),
1640 relevance: Some(1.0),
1641 estimated_tokens: None,
1642 stage: Some(node_id.to_string()),
1643 metadata,
1644 }
1645 .normalize();
1646
1647 Ok((llm_result, vec![artifact], transcript))
1648}
1649
1650pub fn next_nodes_for(
1651 graph: &WorkflowGraph,
1652 current: &str,
1653 branch: Option<&str>,
1654) -> Vec<WorkflowEdge> {
1655 let mut matching: Vec<WorkflowEdge> = graph
1656 .edges
1657 .iter()
1658 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1659 .cloned()
1660 .collect();
1661 if matching.is_empty() {
1662 matching = graph
1663 .edges
1664 .iter()
1665 .filter(|edge| edge.from == current && edge.branch.is_none())
1666 .cloned()
1667 .collect();
1668 }
1669 matching
1670}
1671
1672pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1673 next_nodes_for(graph, current, Some(branch))
1674 .into_iter()
1675 .next()
1676 .map(|edge| edge.to)
1677}
1678
1679pub fn append_audit_entry(
1680 graph: &mut WorkflowGraph,
1681 op: &str,
1682 node_id: Option<String>,
1683 reason: Option<String>,
1684 metadata: BTreeMap<String, serde_json::Value>,
1685) {
1686 graph.audit_log.push(WorkflowAuditEntry {
1687 id: new_id("audit"),
1688 op: op.to_string(),
1689 node_id,
1690 timestamp: now_rfc3339(),
1691 reason,
1692 metadata,
1693 });
1694}