1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10 ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11 MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, ToolRuntimePolicyMetadata,
12 TranscriptPolicy,
13};
14use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20 pub id: Option<String>,
21 pub kind: String,
22 pub mode: Option<String>,
23 pub prompt: Option<String>,
24 pub system: Option<String>,
25 pub task_label: Option<String>,
26 pub done_sentinel: Option<String>,
27 pub tools: serde_json::Value,
28 pub model_policy: ModelPolicy,
29 pub transcript_policy: TranscriptPolicy,
30 pub context_policy: ContextPolicy,
31 pub retry_policy: RetryPolicy,
32 pub capability_policy: CapabilityPolicy,
33 pub approval_policy: super::ToolApprovalPolicy,
34 pub input_contract: StageContract,
35 pub output_contract: StageContract,
36 pub branch_semantics: BranchSemantics,
37 pub map_policy: MapPolicy,
38 pub join_policy: JoinPolicy,
39 pub reduce_policy: ReducePolicy,
40 pub escalation_policy: EscalationPolicy,
41 pub verify: Option<serde_json::Value>,
42 #[serde(default)]
47 pub exit_when_verified: bool,
48 pub metadata: BTreeMap<String, serde_json::Value>,
49 #[serde(skip)]
50 pub raw_tools: Option<VmValue>,
51 #[serde(skip)]
54 pub raw_transcript_policy: Option<VmValue>,
55 #[serde(skip)]
58 pub raw_model_policy: Option<VmValue>,
59}
60
61impl PartialEq for WorkflowNode {
62 fn eq(&self, other: &Self) -> bool {
63 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
64 }
65}
66
67pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
68 match value {
69 serde_json::Value::Null => Vec::new(),
70 serde_json::Value::Array(items) => items
71 .iter()
72 .filter_map(|item| match item {
73 serde_json::Value::Object(map) => map
74 .get("name")
75 .and_then(|value| value.as_str())
76 .filter(|name| !name.is_empty())
77 .map(|name| name.to_string()),
78 _ => None,
79 })
80 .collect(),
81 serde_json::Value::Object(map) => {
82 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
83 return map
84 .get("tools")
85 .map(workflow_tool_names)
86 .unwrap_or_default();
87 }
88 map.get("name")
89 .and_then(|value| value.as_str())
90 .filter(|name| !name.is_empty())
91 .map(|name| vec![name.to_string()])
92 .unwrap_or_default()
93 }
94 _ => Vec::new(),
95 }
96}
97
98fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
99 fn rank(v: &str) -> usize {
100 match v {
101 "none" => 0,
102 "read_only" => 1,
103 "workspace_write" => 2,
104 "process_exec" => 3,
105 "network" => 4,
106 _ => 5,
107 }
108 }
109 levels.max_by_key(|level| rank(level))
110}
111
112fn parse_tool_runtime_policy(
113 map: &serde_json::Map<String, serde_json::Value>,
114) -> ToolRuntimePolicyMetadata {
115 let Some(policy) = map.get("policy").and_then(|value| value.as_object()) else {
116 return ToolRuntimePolicyMetadata::default();
117 };
118
119 let capabilities = policy
120 .get("capabilities")
121 .and_then(|value| value.as_object())
122 .map(|caps| {
123 caps.iter()
124 .map(|(capability, ops)| {
125 let values = ops
126 .as_array()
127 .map(|items| {
128 items
129 .iter()
130 .filter_map(|item| item.as_str().map(|s| s.to_string()))
131 .collect::<Vec<_>>()
132 })
133 .unwrap_or_default();
134 (capability.clone(), values)
135 })
136 .collect::<BTreeMap<_, _>>()
137 })
138 .unwrap_or_default();
139
140 let path_params = policy
141 .get("path_params")
142 .and_then(|value| value.as_array())
143 .map(|items| {
144 items
145 .iter()
146 .filter_map(|item| item.as_str().map(|s| s.to_string()))
147 .collect::<Vec<_>>()
148 })
149 .unwrap_or_default();
150
151 ToolRuntimePolicyMetadata {
152 capabilities,
153 side_effect_level: policy
154 .get("side_effect_level")
155 .and_then(|value| value.as_str())
156 .map(|s| s.to_string()),
157 path_params,
158 mutation_classification: policy
159 .get("mutation_classification")
160 .and_then(|value| value.as_str())
161 .map(|s| s.to_string()),
162 }
163}
164
165pub fn workflow_tool_metadata(
166 value: &serde_json::Value,
167) -> BTreeMap<String, ToolRuntimePolicyMetadata> {
168 match value {
169 serde_json::Value::Null => BTreeMap::new(),
170 serde_json::Value::Array(items) => items
171 .iter()
172 .filter_map(|item| match item {
173 serde_json::Value::Object(map) => map
174 .get("name")
175 .and_then(|value| value.as_str())
176 .filter(|name| !name.is_empty())
177 .map(|name| (name.to_string(), parse_tool_runtime_policy(map))),
178 _ => None,
179 })
180 .collect(),
181 serde_json::Value::Object(map) => {
182 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
183 return map
184 .get("tools")
185 .map(workflow_tool_metadata)
186 .unwrap_or_default();
187 }
188 map.get("name")
189 .and_then(|value| value.as_str())
190 .filter(|name| !name.is_empty())
191 .map(|name| {
192 let mut metadata = BTreeMap::new();
193 metadata.insert(name.to_string(), parse_tool_runtime_policy(map));
194 metadata
195 })
196 .unwrap_or_default()
197 }
198 _ => BTreeMap::new(),
199 }
200}
201
202pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
203 let tools = workflow_tool_names(value);
204 let tool_metadata = workflow_tool_metadata(value);
205 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
206 for metadata in tool_metadata.values() {
207 for (capability, ops) in &metadata.capabilities {
208 let entry = capabilities.entry(capability.clone()).or_default();
209 for op in ops {
210 if !entry.contains(op) {
211 entry.push(op.clone());
212 }
213 }
214 entry.sort();
215 }
216 }
217 let side_effect_level = max_side_effect_level(
218 tool_metadata
219 .values()
220 .filter_map(|metadata| metadata.side_effect_level.clone()),
221 );
222 CapabilityPolicy {
223 tools,
224 capabilities,
225 workspace_roots: Vec::new(),
226 side_effect_level,
227 recursion_limit: None,
228 tool_arg_constraints: Vec::new(),
229 tool_metadata,
230 }
231}
232
233#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
234#[serde(default)]
235pub struct WorkflowEdge {
236 pub from: String,
237 pub to: String,
238 pub branch: Option<String>,
239 pub label: Option<String>,
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
243#[serde(default)]
244pub struct WorkflowGraph {
245 #[serde(rename = "_type")]
246 pub type_name: String,
247 pub id: String,
248 pub name: Option<String>,
249 pub version: usize,
250 pub entry: String,
251 pub nodes: BTreeMap<String, WorkflowNode>,
252 pub edges: Vec<WorkflowEdge>,
253 pub capability_policy: CapabilityPolicy,
254 pub approval_policy: super::ToolApprovalPolicy,
255 pub metadata: BTreeMap<String, serde_json::Value>,
256 pub audit_log: Vec<WorkflowAuditEntry>,
257}
258
259#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
260#[serde(default)]
261pub struct WorkflowAuditEntry {
262 pub id: String,
263 pub op: String,
264 pub node_id: Option<String>,
265 pub timestamp: String,
266 pub reason: Option<String>,
267 pub metadata: BTreeMap<String, serde_json::Value>,
268}
269
270#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
271#[serde(default)]
272pub struct WorkflowValidationReport {
273 pub valid: bool,
274 pub errors: Vec<String>,
275 pub warnings: Vec<String>,
276 pub reachable_nodes: Vec<String>,
277}
278
279pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
280 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
281 let dict = value.as_dict();
282 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
283 node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
284 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
285 Ok(node)
286}
287
288pub fn parse_workflow_node_json(
289 json: serde_json::Value,
290 label: &str,
291) -> Result<WorkflowNode, VmError> {
292 super::parse_json_payload(json, label)
293}
294
295pub fn parse_workflow_edge_json(
296 json: serde_json::Value,
297 label: &str,
298) -> Result<WorkflowEdge, VmError> {
299 super::parse_json_payload(json, label)
300}
301
302pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
303 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
304 let as_dict = value.as_dict().cloned().unwrap_or_default();
305
306 if graph.nodes.is_empty() {
307 for key in ["act", "verify", "repair"] {
308 if let Some(node_value) = as_dict.get(key) {
309 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
310 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
311 node.id = Some(key.to_string());
312 if node.kind.is_empty() {
313 node.kind = if key == "verify" {
314 "verify".to_string()
315 } else {
316 "stage".to_string()
317 };
318 }
319 if node.model_policy.provider.is_none() {
320 node.model_policy.provider = as_dict
321 .get("provider")
322 .map(|value| value.display())
323 .filter(|value| !value.is_empty());
324 }
325 if node.model_policy.model.is_none() {
326 node.model_policy.model = as_dict
327 .get("model")
328 .map(|value| value.display())
329 .filter(|value| !value.is_empty());
330 }
331 if node.model_policy.model_tier.is_none() {
332 node.model_policy.model_tier = as_dict
333 .get("model_tier")
334 .or_else(|| as_dict.get("tier"))
335 .map(|value| value.display())
336 .filter(|value| !value.is_empty());
337 }
338 if node.model_policy.temperature.is_none() {
339 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
340 if let VmValue::Float(number) = value {
341 Some(*number)
342 } else {
343 value.as_int().map(|number| number as f64)
344 }
345 });
346 }
347 if node.model_policy.max_tokens.is_none() {
348 node.model_policy.max_tokens =
349 as_dict.get("max_tokens").and_then(|value| value.as_int());
350 }
351 if node.mode.is_none() {
352 node.mode = as_dict
353 .get("mode")
354 .map(|value| value.display())
355 .filter(|value| !value.is_empty());
356 }
357 if node.done_sentinel.is_none() {
358 node.done_sentinel = as_dict
359 .get("done_sentinel")
360 .map(|value| value.display())
361 .filter(|value| !value.is_empty());
362 }
363 if key == "verify"
364 && node.verify.is_none()
365 && (raw_node.contains_key("assert_text")
366 || raw_node.contains_key("command")
367 || raw_node.contains_key("expect_status")
368 || raw_node.contains_key("expect_text"))
369 {
370 node.verify = Some(serde_json::json!({
371 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
372 "command": raw_node.get("command").map(vm_value_to_json),
373 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
374 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
375 }));
376 }
377 graph.nodes.insert(key.to_string(), node);
378 }
379 }
380 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
381 graph.entry = "act".to_string();
382 }
383 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
384 if graph.nodes.contains_key("verify") {
385 graph.edges.push(WorkflowEdge {
386 from: "act".to_string(),
387 to: "verify".to_string(),
388 branch: None,
389 label: None,
390 });
391 }
392 if graph.nodes.contains_key("repair") {
393 graph.edges.push(WorkflowEdge {
394 from: "verify".to_string(),
395 to: "repair".to_string(),
396 branch: Some("failed".to_string()),
397 label: None,
398 });
399 graph.edges.push(WorkflowEdge {
400 from: "repair".to_string(),
401 to: "verify".to_string(),
402 branch: Some("retry".to_string()),
403 label: None,
404 });
405 }
406 }
407 }
408
409 if graph.type_name.is_empty() {
410 graph.type_name = "workflow_graph".to_string();
411 }
412 if graph.id.is_empty() {
413 graph.id = new_id("workflow");
414 }
415 if graph.version == 0 {
416 graph.version = 1;
417 }
418 if graph.entry.is_empty() {
419 graph.entry = graph
420 .nodes
421 .keys()
422 .next()
423 .cloned()
424 .unwrap_or_else(|| "act".to_string());
425 }
426 for (node_id, node) in &mut graph.nodes {
427 if node.raw_tools.is_none() {
428 node.raw_tools = as_dict
429 .get("nodes")
430 .and_then(|nodes| nodes.as_dict())
431 .and_then(|nodes| nodes.get(node_id))
432 .and_then(|node_value| node_value.as_dict())
433 .and_then(|raw_node| raw_node.get("tools"))
434 .cloned();
435 }
436 if node.id.is_none() {
437 node.id = Some(node_id.clone());
438 }
439 if node.kind.is_empty() {
440 node.kind = "stage".to_string();
441 }
442 if node.join_policy.strategy.is_empty() {
443 node.join_policy.strategy = "all".to_string();
444 }
445 if node.reduce_policy.strategy.is_empty() {
446 node.reduce_policy.strategy = "concat".to_string();
447 }
448 if node.output_contract.output_kinds.is_empty() {
449 node.output_contract.output_kinds = vec![match node.kind.as_str() {
450 "verify" => "verification_result".to_string(),
451 "reduce" => node
452 .reduce_policy
453 .output_kind
454 .clone()
455 .unwrap_or_else(|| "summary".to_string()),
456 "map" => node
457 .map_policy
458 .output_kind
459 .clone()
460 .unwrap_or_else(|| "artifact".to_string()),
461 "escalation" => "plan".to_string(),
462 _ => "artifact".to_string(),
463 }];
464 }
465 if node.retry_policy.max_attempts == 0 {
466 node.retry_policy.max_attempts = 1;
467 }
468 }
469 Ok(graph)
470}
471
472pub fn validate_workflow(
473 graph: &WorkflowGraph,
474 ceiling: Option<&CapabilityPolicy>,
475) -> WorkflowValidationReport {
476 let mut errors = Vec::new();
477 let mut warnings = Vec::new();
478
479 if !graph.nodes.contains_key(&graph.entry) {
480 errors.push(format!("entry node does not exist: {}", graph.entry));
481 }
482
483 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
484 for edge in &graph.edges {
485 if !node_ids.contains(&edge.from) {
486 errors.push(format!("edge.from references unknown node: {}", edge.from));
487 }
488 if !node_ids.contains(&edge.to) {
489 errors.push(format!("edge.to references unknown node: {}", edge.to));
490 }
491 }
492
493 let reachable_nodes = reachable_nodes(graph);
494 for node_id in &node_ids {
495 if !reachable_nodes.contains(node_id) {
496 warnings.push(format!("node is unreachable: {node_id}"));
497 }
498 }
499
500 for (node_id, node) in &graph.nodes {
501 let incoming = graph
502 .edges
503 .iter()
504 .filter(|edge| edge.to == *node_id)
505 .count();
506 let outgoing: Vec<&WorkflowEdge> = graph
507 .edges
508 .iter()
509 .filter(|edge| edge.from == *node_id)
510 .collect();
511 if let Some(min_inputs) = node.input_contract.min_inputs {
512 if let Some(max_inputs) = node.input_contract.max_inputs {
513 if min_inputs > max_inputs {
514 errors.push(format!(
515 "node {node_id}: input contract min_inputs exceeds max_inputs"
516 ));
517 }
518 }
519 }
520 match node.kind.as_str() {
521 "condition" => {
522 let has_true = outgoing
523 .iter()
524 .any(|edge| edge.branch.as_deref() == Some("true"));
525 let has_false = outgoing
526 .iter()
527 .any(|edge| edge.branch.as_deref() == Some("false"));
528 if !has_true || !has_false {
529 errors.push(format!(
530 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
531 ));
532 }
533 }
534 "fork" => {
535 if outgoing.len() < 2 {
536 errors.push(format!(
537 "node {node_id}: fork nodes require at least two outgoing edges"
538 ));
539 }
540 }
541 "join" => {
542 if incoming < 2 {
543 warnings.push(format!(
544 "node {node_id}: join node has fewer than two incoming edges"
545 ));
546 }
547 }
548 "map" => {
549 if node.map_policy.items.is_empty()
550 && node.map_policy.item_artifact_kind.is_none()
551 && node.input_contract.input_kinds.is_empty()
552 {
553 errors.push(format!(
554 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
555 ));
556 }
557 }
558 "reduce" => {
559 if node.input_contract.input_kinds.is_empty() {
560 warnings.push(format!(
561 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
562 ));
563 }
564 }
565 _ => {}
566 }
567 }
568
569 if let Some(ceiling) = ceiling {
570 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
571 errors.push(error);
572 }
573 for (node_id, node) in &graph.nodes {
574 if let Err(error) = ceiling.intersect(&node.capability_policy) {
575 errors.push(format!("node {node_id}: {error}"));
576 }
577 }
578 }
579
580 WorkflowValidationReport {
581 valid: errors.is_empty(),
582 errors,
583 warnings,
584 reachable_nodes: reachable_nodes.into_iter().collect(),
585 }
586}
587
588fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
589 let mut seen = BTreeSet::new();
590 let mut stack = vec![graph.entry.clone()];
591 while let Some(node_id) = stack.pop() {
592 if !seen.insert(node_id.clone()) {
593 continue;
594 }
595 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
596 stack.push(edge.to.clone());
597 }
598 }
599 seen
600}
601
602pub async fn execute_stage_node(
603 node_id: &str,
604 node: &WorkflowNode,
605 task: &str,
606 artifacts: &[ArtifactRecord],
607 transcript: Option<VmValue>,
608) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
609 let mut selection_policy = node.context_policy.clone();
610 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
611 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
612 }
613 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
614 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
615 let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
616 if node.input_contract.require_transcript && input_transcript.is_none() {
617 return Err(VmError::Runtime(format!(
618 "workflow stage {node_id} requires transcript input"
619 )));
620 }
621 if let Some(min_inputs) = node.input_contract.min_inputs {
622 if selected.len() < min_inputs {
623 return Err(VmError::Runtime(format!(
624 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
625 )));
626 }
627 }
628 if let Some(max_inputs) = node.input_contract.max_inputs {
629 if selected.len() > max_inputs {
630 return Err(VmError::Runtime(format!(
631 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
632 )));
633 }
634 }
635 let prompt = if rendered_context.is_empty() {
636 task.to_string()
637 } else {
638 format!(
639 "{rendered_context}\n\n{}:\n{task}",
640 node.task_label
641 .clone()
642 .unwrap_or_else(|| "Task".to_string())
643 )
644 };
645
646 let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
647 .ok()
648 .filter(|value| !value.trim().is_empty())
649 .unwrap_or_else(|| {
650 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
652 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
653 crate::llm_config::default_tool_format(&model, &provider)
654 });
655 let mut llm_result = if node.kind == "verify" {
656 if let Some(command) = node
657 .verify
658 .as_ref()
659 .and_then(|verify| verify.as_object())
660 .and_then(|verify| verify.get("command"))
661 .and_then(|value| value.as_str())
662 .map(str::trim)
663 .filter(|value| !value.is_empty())
664 {
665 let mut process = if cfg!(target_os = "windows") {
666 let mut cmd = tokio::process::Command::new("cmd");
667 cmd.arg("/C").arg(command);
668 cmd
669 } else {
670 let mut cmd = tokio::process::Command::new("/bin/sh");
671 cmd.arg("-lc").arg(command);
672 cmd
673 };
674 process.stdin(std::process::Stdio::null());
675 if let Some(context) = crate::stdlib::process::current_execution_context() {
676 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
677 process.current_dir(cwd);
678 }
679 if !context.env.is_empty() {
680 process.envs(context.env);
681 }
682 }
683 let output = process
684 .output()
685 .await
686 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
687 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
688 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
689 let combined = if stderr.is_empty() {
690 stdout.clone()
691 } else if stdout.is_empty() {
692 stderr.clone()
693 } else {
694 format!("{stdout}\n{stderr}")
695 };
696 serde_json::json!({
697 "status": "completed",
698 "text": combined,
699 "visible_text": combined,
700 "command": command,
701 "stdout": stdout,
702 "stderr": stderr,
703 "exit_status": output.status.code().unwrap_or(-1),
704 "success": output.status.success(),
705 })
706 } else {
707 serde_json::json!({
708 "status": "completed",
709 "text": "",
710 "visible_text": "",
711 })
712 }
713 } else {
714 let mut options = BTreeMap::new();
715 if let Some(provider) = &node.model_policy.provider {
716 options.insert(
717 "provider".to_string(),
718 VmValue::String(Rc::from(provider.clone())),
719 );
720 }
721 if let Some(model) = &node.model_policy.model {
722 options.insert(
723 "model".to_string(),
724 VmValue::String(Rc::from(model.clone())),
725 );
726 }
727 if let Some(model_tier) = &node.model_policy.model_tier {
728 options.insert(
729 "model_tier".to_string(),
730 VmValue::String(Rc::from(model_tier.clone())),
731 );
732 }
733 if let Some(temperature) = node.model_policy.temperature {
734 options.insert("temperature".to_string(), VmValue::Float(temperature));
735 }
736 if let Some(max_tokens) = node.model_policy.max_tokens {
737 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
738 }
739 let tool_names = workflow_tool_names(&node.tools);
740 let tools_value = node.raw_tools.clone().or_else(|| {
741 if matches!(node.tools, serde_json::Value::Null) {
742 None
743 } else {
744 Some(crate::stdlib::json_to_vm_value(&node.tools))
745 }
746 });
747 if tools_value.is_some() && !tool_names.is_empty() {
748 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
749 }
750 if let Some(transcript) = input_transcript.clone() {
751 options.insert("transcript".to_string(), transcript);
752 }
753
754 let args = vec![
755 VmValue::String(Rc::from(prompt.clone())),
756 node.system
757 .clone()
758 .map(|s| VmValue::String(Rc::from(s)))
759 .unwrap_or(VmValue::Nil),
760 VmValue::Dict(Rc::new(options)),
761 ];
762 let mut opts = extract_llm_options(&args)?;
763
764 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
765 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
766 let effective_policy = tool_policy
767 .intersect(&node.capability_policy)
768 .map_err(VmError::Runtime)?;
769 let auto_compact = if node.transcript_policy.auto_compact {
771 let mut ac = crate::orchestration::AutoCompactConfig::default();
772 if let Some(v) = node.transcript_policy.compact_threshold {
773 ac.token_threshold = v;
774 }
775 if let Some(v) = node.transcript_policy.tool_output_max_chars {
776 ac.tool_output_max_chars = v;
777 }
778 if let Some(ref strategy) = node.transcript_policy.compact_strategy {
779 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
780 ac.compact_strategy = s;
781 }
782 }
783 if let Some(v) = node.transcript_policy.hard_limit_tokens {
784 ac.hard_limit_tokens = Some(v);
785 }
786 if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
787 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
788 ac.hard_limit_strategy = s;
789 }
790 }
791 if let Some(ref raw_tp) = node.raw_transcript_policy {
793 if let Some(dict) = raw_tp.as_dict() {
794 if let Some(cb) = dict.get("compress_callback") {
795 ac.compress_callback = Some(cb.clone());
796 }
797 if let Some(cb) = dict.get("mask_callback") {
798 ac.mask_callback = Some(cb.clone());
799 }
800 }
801 }
802 {
804 let user_specified_threshold =
805 node.transcript_policy.compact_threshold.is_some();
806 let user_specified_hard_limit =
807 node.transcript_policy.hard_limit_tokens.is_some();
808 crate::llm::api::adapt_auto_compact_to_provider(
809 &mut ac,
810 user_specified_threshold,
811 user_specified_hard_limit,
812 &opts.provider,
813 &opts.model,
814 &opts.api_key,
815 )
816 .await;
817 }
818 Some(ac)
819 } else {
820 None
821 };
822 crate::llm::run_agent_loop_internal(
823 &mut opts,
824 crate::llm::AgentLoopConfig {
825 persistent: true,
826 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
827 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
828 nudge: node.model_policy.nudge.clone(),
829 done_sentinel: node.done_sentinel.clone(),
830 break_unless_phase: None,
831 tool_retries: 0,
832 tool_backoff_ms: 1000,
833 tool_format: tool_format.clone(),
834 auto_compact,
835 context_callback: None,
836 policy: Some(effective_policy),
837 approval_policy: Some(node.approval_policy.clone()),
838 daemon: false,
839 daemon_config: Default::default(),
840 llm_retries: 2,
841 llm_backoff_ms: 2000,
842 exit_when_verified: node.exit_when_verified,
843 loop_detect_warn: 2,
844 loop_detect_block: 3,
845 loop_detect_skip: 4,
846 tool_examples: node.model_policy.tool_examples.clone(),
847 post_turn_callback: node
849 .raw_model_policy
850 .as_ref()
851 .and_then(|v| v.as_dict())
852 .and_then(|d| d.get("post_turn_callback"))
853 .cloned(),
854 turn_policy: node.model_policy.turn_policy.clone(),
855 stop_after_successful_tools: node
856 .model_policy
857 .stop_after_successful_tools
858 .clone(),
859 require_successful_tools: node.model_policy.require_successful_tools.clone(),
860 on_tool_call: node
861 .raw_model_policy
862 .as_ref()
863 .and_then(|v| v.as_dict())
864 .and_then(|d| d.get("on_tool_call"))
865 .cloned(),
866 on_tool_result: node
867 .raw_model_policy
868 .as_ref()
869 .and_then(|v| v.as_dict())
870 .and_then(|d| d.get("on_tool_result"))
871 .cloned(),
872 },
873 )
874 .await?
875 } else {
876 let result = vm_call_llm_full(&opts).await?;
877 crate::llm::agent_loop_result_from_llm(&result, opts)
878 }
879 };
880 if let Some(payload) = llm_result.as_object_mut() {
881 payload.insert("prompt".to_string(), serde_json::json!(prompt));
882 payload.insert(
883 "system_prompt".to_string(),
884 serde_json::json!(node.system.clone().unwrap_or_default()),
885 );
886 payload.insert(
887 "rendered_context".to_string(),
888 serde_json::json!(rendered_context),
889 );
890 payload.insert(
891 "selected_artifact_ids".to_string(),
892 serde_json::json!(selected
893 .iter()
894 .map(|artifact| artifact.id.clone())
895 .collect::<Vec<_>>()),
896 );
897 payload.insert(
898 "selected_artifact_titles".to_string(),
899 serde_json::json!(selected
900 .iter()
901 .map(|artifact| artifact.title.clone())
902 .collect::<Vec<_>>()),
903 );
904 payload.insert(
905 "tool_calling_mode".to_string(),
906 serde_json::json!(tool_format.clone()),
907 );
908 }
909
910 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
911 let result_transcript = llm_result
915 .get("transcript")
916 .cloned()
917 .map(|value| crate::stdlib::json_to_vm_value(&value));
918 let transcript = apply_output_transcript_policy(
919 result_transcript.or(input_transcript),
920 &node.transcript_policy,
921 );
922 let output_kind = node
923 .output_contract
924 .output_kinds
925 .first()
926 .cloned()
927 .unwrap_or_else(|| {
928 if node.kind == "verify" {
929 "verification_result".to_string()
930 } else {
931 "artifact".to_string()
932 }
933 });
934 let mut metadata = BTreeMap::new();
935 metadata.insert(
936 "input_artifact_ids".to_string(),
937 serde_json::json!(selected
938 .iter()
939 .map(|artifact| artifact.id.clone())
940 .collect::<Vec<_>>()),
941 );
942 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
943 let artifact = ArtifactRecord {
944 type_name: "artifact".to_string(),
945 id: new_id("artifact"),
946 kind: output_kind,
947 title: Some(format!("stage {node_id} output")),
948 text: Some(visible_text),
949 data: Some(llm_result.clone()),
950 source: Some(node_id.to_string()),
951 created_at: now_rfc3339(),
952 freshness: Some("fresh".to_string()),
953 priority: None,
954 lineage: selected
955 .iter()
956 .map(|artifact| artifact.id.clone())
957 .collect(),
958 relevance: Some(1.0),
959 estimated_tokens: None,
960 stage: Some(node_id.to_string()),
961 metadata,
962 }
963 .normalize();
964
965 Ok((llm_result, vec![artifact], transcript))
966}
967
968pub fn next_nodes_for(
969 graph: &WorkflowGraph,
970 current: &str,
971 branch: Option<&str>,
972) -> Vec<WorkflowEdge> {
973 let mut matching: Vec<WorkflowEdge> = graph
974 .edges
975 .iter()
976 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
977 .cloned()
978 .collect();
979 if matching.is_empty() {
980 matching = graph
981 .edges
982 .iter()
983 .filter(|edge| edge.from == current && edge.branch.is_none())
984 .cloned()
985 .collect();
986 }
987 matching
988}
989
990pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
991 next_nodes_for(graph, current, Some(branch))
992 .into_iter()
993 .next()
994 .map(|edge| edge.to)
995}
996
997pub fn append_audit_entry(
998 graph: &mut WorkflowGraph,
999 op: &str,
1000 node_id: Option<String>,
1001 reason: Option<String>,
1002 metadata: BTreeMap<String, serde_json::Value>,
1003) {
1004 graph.audit_log.push(WorkflowAuditEntry {
1005 id: new_id("audit"),
1006 op: op.to_string(),
1007 node_id,
1008 timestamp: now_rfc3339(),
1009 reason,
1010 metadata,
1011 });
1012}