1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23 pub id: Option<String>,
24 pub kind: String,
25 pub mode: Option<String>,
26 pub prompt: Option<String>,
27 pub system: Option<String>,
28 pub task_label: Option<String>,
29 pub done_sentinel: Option<String>,
30 pub tools: serde_json::Value,
31 pub model_policy: ModelPolicy,
32 pub auto_compact: AutoCompactPolicy,
37 #[serde(default)]
42 pub output_visibility: Option<String>,
43 pub context_policy: ContextPolicy,
44 pub retry_policy: RetryPolicy,
45 pub capability_policy: CapabilityPolicy,
46 pub approval_policy: super::ToolApprovalPolicy,
47 pub input_contract: StageContract,
48 pub output_contract: StageContract,
49 pub branch_semantics: BranchSemantics,
50 pub map_policy: MapPolicy,
51 pub join_policy: JoinPolicy,
52 pub reduce_policy: ReducePolicy,
53 pub escalation_policy: EscalationPolicy,
54 pub verify: Option<serde_json::Value>,
55 #[serde(default)]
60 pub exit_when_verified: bool,
61 pub metadata: BTreeMap<String, serde_json::Value>,
62 #[serde(skip)]
63 pub raw_tools: Option<VmValue>,
64 #[serde(skip)]
68 pub raw_auto_compact: Option<VmValue>,
69 #[serde(skip)]
72 pub raw_model_policy: Option<VmValue>,
73 #[serde(skip)]
78 pub raw_context_assembler: Option<VmValue>,
79}
80
81impl PartialEq for WorkflowNode {
82 fn eq(&self, other: &Self) -> bool {
83 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
84 }
85}
86
87#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(default)]
89pub struct VerificationRequirement {
90 pub kind: String,
91 pub value: String,
92 pub note: Option<String>,
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationContract {
98 pub source_node: Option<String>,
99 pub summary: Option<String>,
100 pub command: Option<String>,
101 pub expect_status: Option<i64>,
102 pub assert_text: Option<String>,
103 pub expect_text: Option<String>,
104 pub required_identifiers: Vec<String>,
105 pub required_paths: Vec<String>,
106 pub required_text: Vec<String>,
107 pub notes: Vec<String>,
108 pub checks: Vec<VerificationRequirement>,
109}
110
111impl VerificationContract {
112 fn is_empty(&self) -> bool {
113 self.summary.is_none()
114 && self.command.is_none()
115 && self.expect_status.is_none()
116 && self.assert_text.is_none()
117 && self.expect_text.is_none()
118 && self.required_identifiers.is_empty()
119 && self.required_paths.is_empty()
120 && self.required_text.is_empty()
121 && self.notes.is_empty()
122 && self.checks.is_empty()
123 }
124}
125
126fn push_unique_string(values: &mut Vec<String>, value: &str) {
127 let trimmed = value.trim();
128 if trimmed.is_empty() {
129 return;
130 }
131 if !values.iter().any(|existing| existing == trimmed) {
132 values.push(trimmed.to_string());
133 }
134}
135
136fn push_unique_requirement(
137 values: &mut Vec<VerificationRequirement>,
138 kind: &str,
139 value: &str,
140 note: Option<&str>,
141) {
142 let trimmed_kind = kind.trim();
143 let trimmed_value = value.trim();
144 let trimmed_note = note
145 .map(str::trim)
146 .filter(|candidate| !candidate.is_empty())
147 .map(|candidate| candidate.to_string());
148 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
149 return;
150 }
151 let candidate = VerificationRequirement {
152 kind: trimmed_kind.to_string(),
153 value: trimmed_value.to_string(),
154 note: trimmed_note,
155 };
156 if !values.iter().any(|existing| existing == &candidate) {
157 values.push(candidate);
158 }
159}
160
161fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
162 match value {
163 Some(serde_json::Value::String(text)) => {
164 let mut values = Vec::new();
165 push_unique_string(&mut values, text);
166 values
167 }
168 Some(serde_json::Value::Array(items)) => {
169 let mut values = Vec::new();
170 for item in items {
171 if let Some(text) = item.as_str() {
172 push_unique_string(&mut values, text);
173 }
174 }
175 values
176 }
177 _ => Vec::new(),
178 }
179}
180
181fn merge_verification_requirement_list(
182 target: &mut Vec<VerificationRequirement>,
183 value: Option<&serde_json::Value>,
184) {
185 let Some(items) = value.and_then(|raw| raw.as_array()) else {
186 return;
187 };
188 for item in items {
189 let Some(object) = item.as_object() else {
190 continue;
191 };
192 let kind = object
193 .get("kind")
194 .and_then(|value| value.as_str())
195 .unwrap_or_default();
196 let value = object
197 .get("value")
198 .and_then(|value| value.as_str())
199 .unwrap_or_default();
200 let note = object
201 .get("note")
202 .or_else(|| object.get("description"))
203 .or_else(|| object.get("reason"))
204 .and_then(|value| value.as_str());
205 push_unique_requirement(target, kind, value, note);
206 }
207}
208
209fn merge_verification_contract_fields(
210 target: &mut VerificationContract,
211 object: &serde_json::Map<String, serde_json::Value>,
212) {
213 if target.summary.is_none() {
214 target.summary = object
215 .get("summary")
216 .and_then(|value| value.as_str())
217 .map(str::trim)
218 .filter(|value| !value.is_empty())
219 .map(|value| value.to_string());
220 }
221 if target.command.is_none() {
222 target.command = object
223 .get("command")
224 .and_then(|value| value.as_str())
225 .map(str::trim)
226 .filter(|value| !value.is_empty())
227 .map(|value| value.to_string());
228 }
229 if target.expect_status.is_none() {
230 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
231 }
232 if target.assert_text.is_none() {
233 target.assert_text = object
234 .get("assert_text")
235 .and_then(|value| value.as_str())
236 .map(str::trim)
237 .filter(|value| !value.is_empty())
238 .map(|value| value.to_string());
239 }
240 if target.expect_text.is_none() {
241 target.expect_text = object
242 .get("expect_text")
243 .and_then(|value| value.as_str())
244 .map(str::trim)
245 .filter(|value| !value.is_empty())
246 .map(|value| value.to_string());
247 }
248
249 for value in json_string_list(
250 object
251 .get("required_identifiers")
252 .or_else(|| object.get("identifiers")),
253 ) {
254 push_unique_string(&mut target.required_identifiers, &value);
255 }
256 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
257 push_unique_string(&mut target.required_paths, &value);
258 }
259 for value in json_string_list(
260 object
261 .get("required_text")
262 .or_else(|| object.get("exact_text"))
263 .or_else(|| object.get("required_strings")),
264 ) {
265 push_unique_string(&mut target.required_text, &value);
266 }
267 for value in json_string_list(object.get("notes")) {
268 push_unique_string(&mut target.notes, &value);
269 }
270 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
271}
272
273fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
274 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
275 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
276 VmError::Runtime(format!(
277 "workflow verification contract read failed for {}: {error}",
278 resolved.display()
279 ))
280 })?;
281 serde_json::from_str(&contents).map_err(|error| {
282 VmError::Runtime(format!(
283 "workflow verification contract parse failed for {}: {error}",
284 resolved.display()
285 ))
286 })
287}
288
289fn resolve_verification_contract_path(
290 verify: &serde_json::Map<String, serde_json::Value>,
291) -> Result<Option<serde_json::Value>, VmError> {
292 let Some(path) = verify
293 .get("contract_path")
294 .or_else(|| verify.get("verification_contract_path"))
295 .and_then(|value| value.as_str())
296 .map(str::trim)
297 .filter(|value| !value.is_empty())
298 else {
299 return Ok(None);
300 };
301 Ok(Some(load_verification_contract_file(path)?))
302}
303
304pub fn verification_contract_from_verify(
305 node_id: &str,
306 verify: Option<&serde_json::Value>,
307) -> Result<Option<VerificationContract>, VmError> {
308 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
309 return Ok(None);
310 };
311
312 let mut contract = VerificationContract {
313 source_node: Some(node_id.to_string()),
314 ..Default::default()
315 };
316
317 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
318 let Some(object) = file_contract.as_object() else {
319 return Err(VmError::Runtime(
320 "workflow verification contract file must parse to a JSON object".to_string(),
321 ));
322 };
323 merge_verification_contract_fields(&mut contract, object);
324 }
325
326 if let Some(inline_contract) = verify_object.get("contract") {
327 let Some(object) = inline_contract.as_object() else {
328 return Err(VmError::Runtime(
329 "workflow verify.contract must be an object".to_string(),
330 ));
331 };
332 merge_verification_contract_fields(&mut contract, object);
333 }
334
335 merge_verification_contract_fields(&mut contract, verify_object);
336
337 if let Some(assert_text) = contract.assert_text.clone() {
338 push_unique_requirement(
339 &mut contract.checks,
340 "visible_text_contains",
341 &assert_text,
342 Some("verify stage requires visible output to contain this text"),
343 );
344 }
345 if let Some(expect_text) = contract.expect_text.clone() {
346 push_unique_requirement(
347 &mut contract.checks,
348 "combined_output_contains",
349 &expect_text,
350 Some("verify command requires combined stdout/stderr to contain this text"),
351 );
352 }
353 if let Some(expect_status) = contract.expect_status {
354 push_unique_requirement(
355 &mut contract.checks,
356 "expect_status",
357 &expect_status.to_string(),
358 Some("verify command exit status must match exactly"),
359 );
360 }
361 for identifier in contract.required_identifiers.clone() {
362 push_unique_requirement(
363 &mut contract.checks,
364 "identifier",
365 &identifier,
366 Some("use this exact identifier spelling"),
367 );
368 }
369 for path in contract.required_paths.clone() {
370 push_unique_requirement(
371 &mut contract.checks,
372 "path",
373 &path,
374 Some("preserve this exact path"),
375 );
376 }
377 for text in contract.required_text.clone() {
378 push_unique_requirement(
379 &mut contract.checks,
380 "text",
381 &text,
382 Some("required exact text or wiring snippet"),
383 );
384 }
385
386 if contract.is_empty() {
387 return Ok(None);
388 }
389 Ok(Some(contract))
390}
391
392fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
393 if !values.iter().any(|existing| existing == &candidate) {
394 values.push(candidate);
395 }
396}
397
398pub fn workflow_verification_contracts(
399 graph: &WorkflowGraph,
400) -> Result<Vec<VerificationContract>, VmError> {
401 let mut contracts = Vec::new();
402 for (node_id, node) in &graph.nodes {
403 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
404 push_unique_contract(&mut contracts, contract);
405 }
406 }
407 Ok(contracts)
408}
409
410pub fn inject_workflow_verification_contracts(
411 node: &mut WorkflowNode,
412 contracts: &[VerificationContract],
413) {
414 if contracts.is_empty() {
415 return;
416 }
417 node.metadata.insert(
418 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
419 serde_json::to_value(contracts).unwrap_or_default(),
420 );
421}
422
423pub fn stage_verification_contracts(
424 node_id: &str,
425 node: &WorkflowNode,
426) -> Result<Vec<VerificationContract>, VmError> {
427 let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
428 let local_only = matches!(
429 node.metadata
430 .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
431 .and_then(|value| value.as_str()),
432 Some("local_only")
433 );
434 if local_only {
435 return Ok(local_contract.into_iter().collect());
436 }
437
438 let mut contracts = node
439 .metadata
440 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
441 .cloned()
442 .map(|value| {
443 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
444 VmError::Runtime(format!(
445 "workflow stage {node_id} verification contract metadata parse failed: {error}"
446 ))
447 })
448 })
449 .transpose()?
450 .unwrap_or_default();
451
452 if let Some(local_contract) = local_contract {
453 push_unique_contract(&mut contracts, local_contract);
454 }
455 Ok(contracts)
456}
457
458pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
459 match value {
460 serde_json::Value::Null => Vec::new(),
461 serde_json::Value::Array(items) => items
462 .iter()
463 .filter_map(|item| match item {
464 serde_json::Value::Object(map) => map
465 .get("name")
466 .and_then(|value| value.as_str())
467 .filter(|name| !name.is_empty())
468 .map(|name| name.to_string()),
469 _ => None,
470 })
471 .collect(),
472 serde_json::Value::Object(map) => {
473 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
474 return map
475 .get("tools")
476 .map(workflow_tool_names)
477 .unwrap_or_default();
478 }
479 map.get("name")
480 .and_then(|value| value.as_str())
481 .filter(|name| !name.is_empty())
482 .map(|name| vec![name.to_string()])
483 .unwrap_or_default()
484 }
485 _ => Vec::new(),
486 }
487}
488
489fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
490 fn rank(v: &str) -> usize {
491 match v {
492 "none" => 0,
493 "read_only" => 1,
494 "workspace_write" => 2,
495 "process_exec" => 3,
496 "network" => 4,
497 _ => 5,
498 }
499 }
500 levels.max_by_key(|level| rank(level))
501}
502
503fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
504 match value.and_then(|v| v.as_str()).unwrap_or("") {
505 "read" => ToolKind::Read,
506 "edit" => ToolKind::Edit,
507 "delete" => ToolKind::Delete,
508 "move" => ToolKind::Move,
509 "search" => ToolKind::Search,
510 "execute" => ToolKind::Execute,
511 "think" => ToolKind::Think,
512 "fetch" => ToolKind::Fetch,
513 _ => ToolKind::Other,
514 }
515}
516
517fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
518 let policy = map
519 .get("policy")
520 .and_then(|value| value.as_object())
521 .cloned()
522 .unwrap_or_default();
523
524 let capabilities = policy
525 .get("capabilities")
526 .and_then(|value| value.as_object())
527 .map(|caps| {
528 caps.iter()
529 .map(|(capability, ops)| {
530 let values = ops
531 .as_array()
532 .map(|items| {
533 items
534 .iter()
535 .filter_map(|item| item.as_str().map(|s| s.to_string()))
536 .collect::<Vec<_>>()
537 })
538 .unwrap_or_default();
539 (capability.clone(), values)
540 })
541 .collect::<BTreeMap<_, _>>()
542 })
543 .unwrap_or_default();
544
545 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
548 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
549 } else {
550 ToolArgSchema {
551 path_params: policy
552 .get("path_params")
553 .and_then(|value| value.as_array())
554 .map(|items| {
555 items
556 .iter()
557 .filter_map(|item| item.as_str().map(|s| s.to_string()))
558 .collect::<Vec<_>>()
559 })
560 .unwrap_or_default(),
561 arg_aliases: policy
562 .get("arg_aliases")
563 .and_then(|value| value.as_object())
564 .map(|aliases| {
565 aliases
566 .iter()
567 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
568 .collect::<BTreeMap<_, _>>()
569 })
570 .unwrap_or_default(),
571 required: policy
572 .get("required")
573 .and_then(|value| value.as_array())
574 .map(|items| {
575 items
576 .iter()
577 .filter_map(|item| item.as_str().map(|s| s.to_string()))
578 .collect::<Vec<_>>()
579 })
580 .unwrap_or_default(),
581 }
582 };
583
584 let kind = parse_tool_kind(policy.get("kind"));
585 let side_effect_level = policy
586 .get("side_effect_level")
587 .and_then(|value| value.as_str())
588 .map(SideEffectLevel::parse)
589 .unwrap_or_default();
590
591 ToolAnnotations {
592 kind,
593 side_effect_level,
594 arg_schema,
595 capabilities,
596 }
597}
598
599pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
600 match value {
601 serde_json::Value::Null => BTreeMap::new(),
602 serde_json::Value::Array(items) => items
603 .iter()
604 .filter_map(|item| match item {
605 serde_json::Value::Object(map) => map
606 .get("name")
607 .and_then(|value| value.as_str())
608 .filter(|name| !name.is_empty())
609 .map(|name| (name.to_string(), parse_tool_annotations(map))),
610 _ => None,
611 })
612 .collect(),
613 serde_json::Value::Object(map) => {
614 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
615 return map
616 .get("tools")
617 .map(workflow_tool_annotations)
618 .unwrap_or_default();
619 }
620 map.get("name")
621 .and_then(|value| value.as_str())
622 .filter(|name| !name.is_empty())
623 .map(|name| {
624 let mut annotations = BTreeMap::new();
625 annotations.insert(name.to_string(), parse_tool_annotations(map));
626 annotations
627 })
628 .unwrap_or_default()
629 }
630 _ => BTreeMap::new(),
631 }
632}
633
634pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
635 let tools = workflow_tool_names(value);
636 let tool_annotations = workflow_tool_annotations(value);
637 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
638 for annotations in tool_annotations.values() {
639 for (capability, ops) in &annotations.capabilities {
640 let entry = capabilities.entry(capability.clone()).or_default();
641 for op in ops {
642 if !entry.contains(op) {
643 entry.push(op.clone());
644 }
645 }
646 entry.sort();
647 }
648 }
649 let side_effect_level = max_side_effect_level(
650 tool_annotations
651 .values()
652 .map(|annotations| annotations.side_effect_level.as_str().to_string())
653 .filter(|level| level != "none"),
654 );
655 CapabilityPolicy {
656 tools,
657 capabilities,
658 workspace_roots: Vec::new(),
659 side_effect_level,
660 recursion_limit: None,
661 tool_arg_constraints: Vec::new(),
662 tool_annotations,
663 }
664}
665
666#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
667#[serde(default)]
668pub struct WorkflowEdge {
669 pub from: String,
670 pub to: String,
671 pub branch: Option<String>,
672 pub label: Option<String>,
673}
674
675#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
676#[serde(default)]
677pub struct WorkflowGraph {
678 #[serde(rename = "_type")]
679 pub type_name: String,
680 pub id: String,
681 pub name: Option<String>,
682 pub version: usize,
683 pub entry: String,
684 pub nodes: BTreeMap<String, WorkflowNode>,
685 pub edges: Vec<WorkflowEdge>,
686 pub capability_policy: CapabilityPolicy,
687 pub approval_policy: super::ToolApprovalPolicy,
688 pub metadata: BTreeMap<String, serde_json::Value>,
689 pub audit_log: Vec<WorkflowAuditEntry>,
690}
691
692#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
693#[serde(default)]
694pub struct WorkflowAuditEntry {
695 pub id: String,
696 pub op: String,
697 pub node_id: Option<String>,
698 pub timestamp: String,
699 pub reason: Option<String>,
700 pub metadata: BTreeMap<String, serde_json::Value>,
701}
702
703#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
704#[serde(default)]
705pub struct WorkflowValidationReport {
706 pub valid: bool,
707 pub errors: Vec<String>,
708 pub warnings: Vec<String>,
709 pub reachable_nodes: Vec<String>,
710}
711
712pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
713 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
714 let dict = value.as_dict();
715 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
716 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
717 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
718 node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
719 Ok(node)
720}
721
722pub fn parse_workflow_node_json(
723 json: serde_json::Value,
724 label: &str,
725) -> Result<WorkflowNode, VmError> {
726 super::parse_json_payload(json, label)
727}
728
729pub fn parse_workflow_edge_json(
730 json: serde_json::Value,
731 label: &str,
732) -> Result<WorkflowEdge, VmError> {
733 super::parse_json_payload(json, label)
734}
735
736pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
737 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
738 let as_dict = value.as_dict().cloned().unwrap_or_default();
739
740 if graph.nodes.is_empty() {
741 for key in ["act", "verify", "repair"] {
742 if let Some(node_value) = as_dict.get(key) {
743 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
744 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
745 node.id = Some(key.to_string());
746 if node.kind.is_empty() {
747 node.kind = if key == "verify" {
748 "verify".to_string()
749 } else {
750 "stage".to_string()
751 };
752 }
753 if node.model_policy.provider.is_none() {
754 node.model_policy.provider = as_dict
755 .get("provider")
756 .map(|value| value.display())
757 .filter(|value| !value.is_empty());
758 }
759 if node.model_policy.model.is_none() {
760 node.model_policy.model = as_dict
761 .get("model")
762 .map(|value| value.display())
763 .filter(|value| !value.is_empty());
764 }
765 if node.model_policy.model_tier.is_none() {
766 node.model_policy.model_tier = as_dict
767 .get("model_tier")
768 .or_else(|| as_dict.get("tier"))
769 .map(|value| value.display())
770 .filter(|value| !value.is_empty());
771 }
772 if node.model_policy.temperature.is_none() {
773 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
774 if let VmValue::Float(number) = value {
775 Some(*number)
776 } else {
777 value.as_int().map(|number| number as f64)
778 }
779 });
780 }
781 if node.model_policy.max_tokens.is_none() {
782 node.model_policy.max_tokens =
783 as_dict.get("max_tokens").and_then(|value| value.as_int());
784 }
785 if node.mode.is_none() {
786 node.mode = as_dict
787 .get("mode")
788 .map(|value| value.display())
789 .filter(|value| !value.is_empty());
790 }
791 if node.done_sentinel.is_none() {
792 node.done_sentinel = as_dict
793 .get("done_sentinel")
794 .map(|value| value.display())
795 .filter(|value| !value.is_empty());
796 }
797 if key == "verify"
798 && node.verify.is_none()
799 && (raw_node.contains_key("assert_text")
800 || raw_node.contains_key("command")
801 || raw_node.contains_key("expect_status")
802 || raw_node.contains_key("expect_text"))
803 {
804 node.verify = Some(serde_json::json!({
805 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
806 "command": raw_node.get("command").map(vm_value_to_json),
807 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
808 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
809 }));
810 }
811 graph.nodes.insert(key.to_string(), node);
812 }
813 }
814 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
815 graph.entry = "act".to_string();
816 }
817 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
818 if graph.nodes.contains_key("verify") {
819 graph.edges.push(WorkflowEdge {
820 from: "act".to_string(),
821 to: "verify".to_string(),
822 branch: None,
823 label: None,
824 });
825 }
826 if graph.nodes.contains_key("repair") {
827 graph.edges.push(WorkflowEdge {
828 from: "verify".to_string(),
829 to: "repair".to_string(),
830 branch: Some("failed".to_string()),
831 label: None,
832 });
833 graph.edges.push(WorkflowEdge {
834 from: "repair".to_string(),
835 to: "verify".to_string(),
836 branch: Some("retry".to_string()),
837 label: None,
838 });
839 }
840 }
841 }
842
843 if graph.type_name.is_empty() {
844 graph.type_name = "workflow_graph".to_string();
845 }
846 if graph.id.is_empty() {
847 graph.id = new_id("workflow");
848 }
849 if graph.version == 0 {
850 graph.version = 1;
851 }
852 if graph.entry.is_empty() {
853 graph.entry = graph
854 .nodes
855 .keys()
856 .next()
857 .cloned()
858 .unwrap_or_else(|| "act".to_string());
859 }
860 for (node_id, node) in &mut graph.nodes {
861 if node.raw_tools.is_none() {
862 node.raw_tools = as_dict
863 .get("nodes")
864 .and_then(|nodes| nodes.as_dict())
865 .and_then(|nodes| nodes.get(node_id))
866 .and_then(|node_value| node_value.as_dict())
867 .and_then(|raw_node| raw_node.get("tools"))
868 .cloned();
869 }
870 if node.id.is_none() {
871 node.id = Some(node_id.clone());
872 }
873 if node.kind.is_empty() {
874 node.kind = "stage".to_string();
875 }
876 if node.join_policy.strategy.is_empty() {
877 node.join_policy.strategy = "all".to_string();
878 }
879 if node.reduce_policy.strategy.is_empty() {
880 node.reduce_policy.strategy = "concat".to_string();
881 }
882 if node.output_contract.output_kinds.is_empty() {
883 node.output_contract.output_kinds = vec![match node.kind.as_str() {
884 "verify" => "verification_result".to_string(),
885 "reduce" => node
886 .reduce_policy
887 .output_kind
888 .clone()
889 .unwrap_or_else(|| "summary".to_string()),
890 "map" => node
891 .map_policy
892 .output_kind
893 .clone()
894 .unwrap_or_else(|| "artifact".to_string()),
895 "escalation" => "plan".to_string(),
896 _ => "artifact".to_string(),
897 }];
898 }
899 if node.retry_policy.max_attempts == 0 {
900 node.retry_policy.max_attempts = 1;
901 }
902 }
903 Ok(graph)
904}
905
906pub fn validate_workflow(
907 graph: &WorkflowGraph,
908 ceiling: Option<&CapabilityPolicy>,
909) -> WorkflowValidationReport {
910 let mut errors = Vec::new();
911 let mut warnings = Vec::new();
912
913 if !graph.nodes.contains_key(&graph.entry) {
914 errors.push(format!("entry node does not exist: {}", graph.entry));
915 }
916
917 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
918 for edge in &graph.edges {
919 if !node_ids.contains(&edge.from) {
920 errors.push(format!("edge.from references unknown node: {}", edge.from));
921 }
922 if !node_ids.contains(&edge.to) {
923 errors.push(format!("edge.to references unknown node: {}", edge.to));
924 }
925 }
926
927 let reachable_nodes = reachable_nodes(graph);
928 for node_id in &node_ids {
929 if !reachable_nodes.contains(node_id) {
930 warnings.push(format!("node is unreachable: {node_id}"));
931 }
932 }
933
934 for (node_id, node) in &graph.nodes {
935 let incoming = graph
936 .edges
937 .iter()
938 .filter(|edge| edge.to == *node_id)
939 .count();
940 let outgoing: Vec<&WorkflowEdge> = graph
941 .edges
942 .iter()
943 .filter(|edge| edge.from == *node_id)
944 .collect();
945 if let Some(min_inputs) = node.input_contract.min_inputs {
946 if let Some(max_inputs) = node.input_contract.max_inputs {
947 if min_inputs > max_inputs {
948 errors.push(format!(
949 "node {node_id}: input contract min_inputs exceeds max_inputs"
950 ));
951 }
952 }
953 }
954 match node.kind.as_str() {
955 "condition" => {
956 let has_true = outgoing
957 .iter()
958 .any(|edge| edge.branch.as_deref() == Some("true"));
959 let has_false = outgoing
960 .iter()
961 .any(|edge| edge.branch.as_deref() == Some("false"));
962 if !has_true || !has_false {
963 errors.push(format!(
964 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
965 ));
966 }
967 }
968 "fork" if outgoing.len() < 2 => {
969 errors.push(format!(
970 "node {node_id}: fork nodes require at least two outgoing edges"
971 ));
972 }
973 "join" if incoming < 2 => {
974 warnings.push(format!(
975 "node {node_id}: join node has fewer than two incoming edges"
976 ));
977 }
978 "map"
979 if node.map_policy.items.is_empty()
980 && node.map_policy.item_artifact_kind.is_none()
981 && node.input_contract.input_kinds.is_empty() =>
982 {
983 errors.push(format!(
984 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
985 ));
986 }
987 "reduce" if node.input_contract.input_kinds.is_empty() => {
988 warnings.push(format!(
989 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
990 ));
991 }
992 _ => {}
993 }
994 }
995
996 if let Some(ceiling) = ceiling {
997 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
998 errors.push(error);
999 }
1000 for (node_id, node) in &graph.nodes {
1001 if let Err(error) = ceiling.intersect(&node.capability_policy) {
1002 errors.push(format!("node {node_id}: {error}"));
1003 }
1004 }
1005 }
1006
1007 WorkflowValidationReport {
1008 valid: errors.is_empty(),
1009 errors,
1010 warnings,
1011 reachable_nodes: reachable_nodes.into_iter().collect(),
1012 }
1013}
1014
1015fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1016 let mut seen = BTreeSet::new();
1017 let mut stack = vec![graph.entry.clone()];
1018 while let Some(node_id) = stack.pop() {
1019 if !seen.insert(node_id.clone()) {
1020 continue;
1021 }
1022 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1023 stack.push(edge.to.clone());
1024 }
1025 }
1026 seen
1027}
1028
1029fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1041 let per_node = node
1042 .raw_model_policy
1043 .as_ref()
1044 .and_then(|v| v.as_dict())
1045 .and_then(|d| d.get("skills"))
1046 .cloned()
1047 .and_then(normalize_inline_registry);
1048 if per_node.is_some() {
1049 return per_node;
1050 }
1051 super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1052}
1053
1054fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1058 let per_node = node
1059 .raw_model_policy
1060 .as_ref()
1061 .and_then(|v| v.as_dict())
1062 .and_then(|d| d.get("skill_match"))
1063 .and_then(|v| v.as_dict().cloned());
1064 if let Some(dict) = per_node {
1065 return crate::llm::parse_skill_match_config_dict(&dict);
1066 }
1067 super::current_workflow_skill_context()
1068 .and_then(|ctx| ctx.match_config)
1069 .and_then(|v| v.as_dict().cloned())
1070 .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1071 .unwrap_or_default()
1072}
1073
1074fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1079 use std::collections::BTreeMap;
1080 use std::rc::Rc;
1081 match &value {
1082 VmValue::Dict(d)
1083 if d.get("_type")
1084 .map(|v| v.display() == "skill_registry")
1085 .unwrap_or(false) =>
1086 {
1087 Some(value)
1088 }
1089 VmValue::List(list) => {
1090 let mut dict = BTreeMap::new();
1091 dict.insert(
1092 "_type".to_string(),
1093 VmValue::String(Rc::from("skill_registry")),
1094 );
1095 dict.insert("skills".to_string(), VmValue::List(list.clone()));
1096 Some(VmValue::Dict(Rc::new(dict)))
1097 }
1098 _ => None,
1099 }
1100}
1101
1102fn resolve_node_session_id(node: &WorkflowNode) -> String {
1103 if let Some(explicit) = node
1104 .raw_model_policy
1105 .as_ref()
1106 .and_then(|v| v.as_dict())
1107 .and_then(|d| d.get("session_id"))
1108 .and_then(|v| match v {
1109 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1110 _ => None,
1111 })
1112 {
1113 return explicit;
1114 }
1115 if let Some(persisted) = node
1116 .metadata
1117 .get("worker_session_id")
1118 .and_then(|value| value.as_str())
1119 .filter(|value| !value.trim().is_empty())
1120 {
1121 return persisted.to_string();
1122 }
1123 format!("workflow_stage_{}", uuid::Uuid::now_v7())
1124}
1125
1126fn raw_auto_compact_dict(
1127 node: &WorkflowNode,
1128) -> Option<&std::collections::BTreeMap<String, VmValue>> {
1129 node.raw_auto_compact
1130 .as_ref()
1131 .and_then(|value| value.as_dict())
1132}
1133
1134fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
1135 raw_auto_compact_dict(node)
1136 .and_then(|dict| dict.get(key))
1137 .and_then(|value| value.as_int())
1138 .filter(|value| *value >= 0)
1139 .map(|value| value as usize)
1140}
1141
1142fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
1143 raw_auto_compact_dict(node)
1144 .and_then(|dict| dict.get(key))
1145 .and_then(|value| match value {
1146 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1147 _ => None,
1148 })
1149}
1150
1151pub(crate) async fn resolve_stage_auto_compact(
1152 node: &WorkflowNode,
1153 opts: &crate::llm::api::LlmCallOptions,
1154) -> Result<Option<crate::orchestration::AutoCompactConfig>, VmError> {
1155 if !node.auto_compact.enabled {
1156 return Ok(None);
1157 }
1158
1159 let mut ac = crate::orchestration::AutoCompactConfig::default();
1160 if let Some(v) = node.auto_compact.token_threshold {
1161 ac.token_threshold = v;
1162 }
1163 if let Some(v) = node.auto_compact.tool_output_max_chars {
1164 ac.tool_output_max_chars = v;
1165 }
1166 if let Some(ref strategy) = node.auto_compact.compact_strategy {
1167 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1168 ac.compact_strategy = s;
1169 }
1170 }
1171 if let Some(v) = node.auto_compact.hard_limit_tokens {
1172 ac.hard_limit_tokens = Some(v);
1173 }
1174 if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1175 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1176 ac.hard_limit_strategy = s;
1177 }
1178 }
1179
1180 if let Some(v) = raw_auto_compact_int(node, "compact_keep_last")
1184 .or_else(|| raw_auto_compact_int(node, "keep_last"))
1185 {
1186 ac.keep_last = v;
1187 }
1188 if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1189 ac.summarize_prompt = Some(prompt);
1190 }
1191
1192 if let Some(dict) = raw_auto_compact_dict(node) {
1195 if let Some(cb) = dict.get("compress_callback") {
1196 ac.compress_callback = Some(cb.clone());
1197 }
1198 if let Some(cb) = dict.get("mask_callback") {
1199 ac.mask_callback = Some(cb.clone());
1200 }
1201 if let Some(cb) = dict.get("custom_compactor") {
1202 ac.custom_compactor = Some(cb.clone());
1203 }
1204 }
1205
1206 let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1207 let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1208 crate::llm::api::adapt_auto_compact_to_provider(
1209 &mut ac,
1210 user_specified_threshold,
1211 user_specified_hard_limit,
1212 &opts.provider,
1213 &opts.model,
1214 &opts.api_key,
1215 )
1216 .await;
1217
1218 Ok(Some(ac))
1219}
1220
1221pub async fn execute_stage_node(
1222 node_id: &str,
1223 node: &WorkflowNode,
1224 task: &str,
1225 artifacts: &[ArtifactRecord],
1226) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1227 let mut selection_policy = node.context_policy.clone();
1228 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1229 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1230 }
1231 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1232 let rendered_context = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1233 let assembled =
1234 crate::stdlib::assemble::assemble_from_options(&selected, assembler).await?;
1235 super::render_assembled_chunks(&assembled)
1236 } else {
1237 super::render_artifacts_context(&selected, &node.context_policy)
1238 };
1239 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1240 let rendered_verification = super::render_verification_context(&verification_contracts);
1241 let stage_session_id = resolve_node_session_id(node);
1242 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1243 return Err(VmError::Runtime(format!(
1244 "workflow stage {node_id} requires an existing session \
1245 (call agent_session_open and feed session_id through model_policy \
1246 before entering this stage)"
1247 )));
1248 }
1249 if let Some(min_inputs) = node.input_contract.min_inputs {
1250 if selected.len() < min_inputs {
1251 return Err(VmError::Runtime(format!(
1252 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1253 )));
1254 }
1255 }
1256 if let Some(max_inputs) = node.input_contract.max_inputs {
1257 if selected.len() > max_inputs {
1258 return Err(VmError::Runtime(format!(
1259 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1260 )));
1261 }
1262 }
1263 let prompt = super::render_workflow_prompt(
1264 task,
1265 node.task_label.as_deref(),
1266 &rendered_verification,
1267 &rendered_context,
1268 );
1269
1270 let tool_format = node
1278 .model_policy
1279 .tool_format
1280 .clone()
1281 .filter(|value| !value.trim().is_empty())
1282 .or_else(|| {
1283 std::env::var("HARN_AGENT_TOOL_FORMAT")
1284 .ok()
1285 .filter(|value| !value.trim().is_empty())
1286 })
1287 .unwrap_or_else(|| {
1288 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1289 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1290 crate::llm_config::default_tool_format(&model, &provider)
1291 });
1292 let mut llm_result = if node.kind == "verify" {
1293 if let Some(command) = node
1294 .verify
1295 .as_ref()
1296 .and_then(|verify| verify.as_object())
1297 .and_then(|verify| verify.get("command"))
1298 .and_then(|value| value.as_str())
1299 .map(str::trim)
1300 .filter(|value| !value.is_empty())
1301 {
1302 let (program, args) = if cfg!(target_os = "windows") {
1303 ("cmd", vec!["/C".to_string(), command.to_string()])
1304 } else {
1305 ("/bin/sh", vec!["-lc".to_string(), command.to_string()])
1306 };
1307 let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1308 stdin_null: true,
1309 ..Default::default()
1310 };
1311 if let Some(context) = crate::stdlib::process::current_execution_context() {
1312 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1313 crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1314 process_config.cwd = Some(std::path::PathBuf::from(cwd));
1315 }
1316 if !context.env.is_empty() {
1317 process_config.env.extend(context.env);
1318 }
1319 }
1320 let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1321 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1322 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1323 let combined = if stderr.is_empty() {
1324 stdout.clone()
1325 } else if stdout.is_empty() {
1326 stderr.clone()
1327 } else {
1328 format!("{stdout}\n{stderr}")
1329 };
1330 serde_json::json!({
1331 "status": "completed",
1332 "text": combined,
1333 "visible_text": combined,
1334 "command": command,
1335 "stdout": stdout,
1336 "stderr": stderr,
1337 "exit_status": output.status.code().unwrap_or(-1),
1338 "success": output.status.success(),
1339 })
1340 } else {
1341 serde_json::json!({
1342 "status": "completed",
1343 "text": "",
1344 "visible_text": "",
1345 })
1346 }
1347 } else {
1348 let mut options = BTreeMap::new();
1349 if let Some(provider) = &node.model_policy.provider {
1350 options.insert(
1351 "provider".to_string(),
1352 VmValue::String(Rc::from(provider.clone())),
1353 );
1354 }
1355 if let Some(model) = &node.model_policy.model {
1356 options.insert(
1357 "model".to_string(),
1358 VmValue::String(Rc::from(model.clone())),
1359 );
1360 }
1361 if let Some(model_tier) = &node.model_policy.model_tier {
1362 options.insert(
1363 "model_tier".to_string(),
1364 VmValue::String(Rc::from(model_tier.clone())),
1365 );
1366 }
1367 if let Some(temperature) = node.model_policy.temperature {
1368 options.insert("temperature".to_string(), VmValue::Float(temperature));
1369 }
1370 if let Some(max_tokens) = node.model_policy.max_tokens {
1371 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1372 }
1373 let tool_names = workflow_tool_names(&node.tools);
1374 let tools_value = node.raw_tools.clone().or_else(|| {
1375 if matches!(node.tools, serde_json::Value::Null) {
1376 None
1377 } else {
1378 Some(crate::stdlib::json_to_vm_value(&node.tools))
1379 }
1380 });
1381 if tools_value.is_some() && !tool_names.is_empty() {
1382 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1383 }
1384 options.insert(
1385 "session_id".to_string(),
1386 VmValue::String(Rc::from(stage_session_id.clone())),
1387 );
1388
1389 let args = vec![
1390 VmValue::String(Rc::from(prompt.clone())),
1391 node.system
1392 .clone()
1393 .map(|s| VmValue::String(Rc::from(s)))
1394 .unwrap_or(VmValue::Nil),
1395 VmValue::Dict(Rc::new(options)),
1396 ];
1397 let mut opts = extract_llm_options(&args)?;
1398 let auto_compact = resolve_stage_auto_compact(node, &opts).await?;
1399
1400 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1401 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1402 let effective_policy = tool_policy
1403 .intersect(&node.capability_policy)
1404 .map_err(VmError::Runtime)?;
1405 let permissions = crate::llm::permissions::parse_dynamic_permission_policy(
1406 node.raw_model_policy
1407 .as_ref()
1408 .and_then(|value| value.as_dict())
1409 .and_then(|dict| dict.get("permissions")),
1410 "workflow model_policy",
1411 )?;
1412 crate::llm::run_agent_loop_internal(
1413 &mut opts,
1414 crate::llm::AgentLoopConfig {
1415 persistent: true,
1416 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1417 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1418 nudge: node.model_policy.nudge.clone(),
1419 done_sentinel: node.done_sentinel.clone(),
1420 break_unless_phase: None,
1421 tool_retries: 0,
1422 tool_backoff_ms: 1000,
1423 tool_format: tool_format.clone(),
1424 native_tool_fallback: node.model_policy.native_tool_fallback,
1425 auto_compact,
1426 policy: Some(effective_policy),
1427 permissions,
1428 approval_policy: Some(node.approval_policy.clone()),
1429 daemon: false,
1430 daemon_config: Default::default(),
1431 llm_retries: 2,
1432 llm_backoff_ms: 2000,
1433 token_budget: None,
1434 exit_when_verified: node.exit_when_verified,
1435 loop_detect_warn: 2,
1436 loop_detect_block: 3,
1437 loop_detect_skip: 4,
1438 tool_examples: node.model_policy.tool_examples.clone(),
1439 turn_policy: node.model_policy.turn_policy.clone(),
1440 stop_after_successful_tools: node
1441 .model_policy
1442 .stop_after_successful_tools
1443 .clone(),
1444 require_successful_tools: node.model_policy.require_successful_tools.clone(),
1445 session_id: stage_session_id.clone(),
1449 event_sink: None,
1450 task_ledger: node
1454 .raw_model_policy
1455 .as_ref()
1456 .and_then(|v| v.as_dict())
1457 .and_then(|d| d.get("task_ledger"))
1458 .map(crate::llm::helpers::vm_value_to_json)
1459 .and_then(|json| serde_json::from_value(json).ok())
1460 .unwrap_or_default(),
1461 post_turn_callback: node
1462 .raw_model_policy
1463 .as_ref()
1464 .and_then(|v| v.as_dict())
1465 .and_then(|d| d.get("post_turn_callback"))
1466 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1467 .cloned(),
1468 skill_registry: resolve_stage_skill_registry(node),
1475 skill_match: resolve_stage_skill_match(node),
1476 working_files: Vec::new(),
1477 },
1478 )
1479 .await?
1480 } else {
1481 let result = vm_call_llm_full(&opts).await?;
1482 crate::llm::agent_loop_result_from_llm(&result, opts)
1483 }
1484 };
1485 if let Some(payload) = llm_result.as_object_mut() {
1486 payload.insert("prompt".to_string(), serde_json::json!(prompt));
1487 payload.insert(
1488 "system_prompt".to_string(),
1489 serde_json::json!(node.system.clone().unwrap_or_default()),
1490 );
1491 payload.insert(
1492 "rendered_context".to_string(),
1493 serde_json::json!(rendered_context),
1494 );
1495 if !verification_contracts.is_empty() {
1496 payload.insert(
1497 "verification_contracts".to_string(),
1498 serde_json::to_value(&verification_contracts).unwrap_or_default(),
1499 );
1500 payload.insert(
1501 "rendered_verification_context".to_string(),
1502 serde_json::json!(rendered_verification),
1503 );
1504 }
1505 payload.insert(
1506 "selected_artifact_ids".to_string(),
1507 serde_json::json!(selected
1508 .iter()
1509 .map(|artifact| artifact.id.clone())
1510 .collect::<Vec<_>>()),
1511 );
1512 payload.insert(
1513 "selected_artifact_titles".to_string(),
1514 serde_json::json!(selected
1515 .iter()
1516 .map(|artifact| artifact.title.clone())
1517 .collect::<Vec<_>>()),
1518 );
1519 match payload
1520 .entry("tools".to_string())
1521 .or_insert_with(|| serde_json::json!({}))
1522 {
1523 serde_json::Value::Object(tools) => {
1524 tools.insert("mode".to_string(), serde_json::json!(tool_format.clone()));
1525 }
1526 slot => {
1527 *slot = serde_json::json!({ "mode": tool_format.clone() });
1528 }
1529 }
1530 }
1531
1532 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1533 let result_transcript = llm_result
1537 .get("transcript")
1538 .cloned()
1539 .map(|value| crate::stdlib::json_to_vm_value(&value));
1540 let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1541 let transcript = result_transcript
1542 .or(session_transcript)
1543 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1544 let output_kind = node
1545 .output_contract
1546 .output_kinds
1547 .first()
1548 .cloned()
1549 .unwrap_or_else(|| {
1550 if node.kind == "verify" {
1551 "verification_result".to_string()
1552 } else {
1553 "artifact".to_string()
1554 }
1555 });
1556 let mut metadata = BTreeMap::new();
1557 metadata.insert(
1558 "input_artifact_ids".to_string(),
1559 serde_json::json!(selected
1560 .iter()
1561 .map(|artifact| artifact.id.clone())
1562 .collect::<Vec<_>>()),
1563 );
1564 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1565 if !node.approval_policy.write_path_allowlist.is_empty() {
1566 metadata.insert(
1567 "changed_paths".to_string(),
1568 serde_json::json!(node.approval_policy.write_path_allowlist),
1569 );
1570 }
1571 let artifact = ArtifactRecord {
1572 type_name: "artifact".to_string(),
1573 id: new_id("artifact"),
1574 kind: output_kind,
1575 title: Some(format!("stage {node_id} output")),
1576 text: Some(visible_text),
1577 data: Some(llm_result.clone()),
1578 source: Some(node_id.to_string()),
1579 created_at: now_rfc3339(),
1580 freshness: Some("fresh".to_string()),
1581 priority: None,
1582 lineage: selected
1583 .iter()
1584 .map(|artifact| artifact.id.clone())
1585 .collect(),
1586 relevance: Some(1.0),
1587 estimated_tokens: None,
1588 stage: Some(node_id.to_string()),
1589 metadata,
1590 }
1591 .normalize();
1592
1593 Ok((llm_result, vec![artifact], transcript))
1594}
1595
1596pub fn next_nodes_for(
1597 graph: &WorkflowGraph,
1598 current: &str,
1599 branch: Option<&str>,
1600) -> Vec<WorkflowEdge> {
1601 let mut matching: Vec<WorkflowEdge> = graph
1602 .edges
1603 .iter()
1604 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1605 .cloned()
1606 .collect();
1607 if matching.is_empty() {
1608 matching = graph
1609 .edges
1610 .iter()
1611 .filter(|edge| edge.from == current && edge.branch.is_none())
1612 .cloned()
1613 .collect();
1614 }
1615 matching
1616}
1617
1618pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1619 next_nodes_for(graph, current, Some(branch))
1620 .into_iter()
1621 .next()
1622 .map(|edge| edge.to)
1623}
1624
1625pub fn append_audit_entry(
1626 graph: &mut WorkflowGraph,
1627 op: &str,
1628 node_id: Option<String>,
1629 reason: Option<String>,
1630 metadata: BTreeMap<String, serde_json::Value>,
1631) {
1632 graph.audit_log.push(WorkflowAuditEntry {
1633 id: new_id("audit"),
1634 op: op.to_string(),
1635 node_id,
1636 timestamp: now_rfc3339(),
1637 reason,
1638 metadata,
1639 });
1640}