1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10 ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11 MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, ToolRuntimePolicyMetadata,
12 TranscriptPolicy,
13};
14use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20 pub id: Option<String>,
21 pub kind: String,
22 pub mode: Option<String>,
23 pub prompt: Option<String>,
24 pub system: Option<String>,
25 pub task_label: Option<String>,
26 pub done_sentinel: Option<String>,
27 pub tools: serde_json::Value,
28 pub model_policy: ModelPolicy,
29 pub transcript_policy: TranscriptPolicy,
30 pub context_policy: ContextPolicy,
31 pub retry_policy: RetryPolicy,
32 pub capability_policy: CapabilityPolicy,
33 pub approval_policy: super::ToolApprovalPolicy,
34 pub input_contract: StageContract,
35 pub output_contract: StageContract,
36 pub branch_semantics: BranchSemantics,
37 pub map_policy: MapPolicy,
38 pub join_policy: JoinPolicy,
39 pub reduce_policy: ReducePolicy,
40 pub escalation_policy: EscalationPolicy,
41 pub verify: Option<serde_json::Value>,
42 #[serde(default)]
47 pub exit_when_verified: bool,
48 #[serde(default)]
52 pub timeout_ms: Option<u64>,
53 pub metadata: BTreeMap<String, serde_json::Value>,
54 #[serde(skip)]
55 pub raw_tools: Option<VmValue>,
56 #[serde(skip)]
59 pub raw_transcript_policy: Option<VmValue>,
60 #[serde(skip)]
63 pub raw_model_policy: Option<VmValue>,
64}
65
66impl PartialEq for WorkflowNode {
67 fn eq(&self, other: &Self) -> bool {
68 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
69 }
70}
71
72pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
73 match value {
74 serde_json::Value::Null => Vec::new(),
75 serde_json::Value::Array(items) => items
76 .iter()
77 .filter_map(|item| match item {
78 serde_json::Value::Object(map) => map
79 .get("name")
80 .and_then(|value| value.as_str())
81 .filter(|name| !name.is_empty())
82 .map(|name| name.to_string()),
83 _ => None,
84 })
85 .collect(),
86 serde_json::Value::Object(map) => {
87 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
88 return map
89 .get("tools")
90 .map(workflow_tool_names)
91 .unwrap_or_default();
92 }
93 map.get("name")
94 .and_then(|value| value.as_str())
95 .filter(|name| !name.is_empty())
96 .map(|name| vec![name.to_string()])
97 .unwrap_or_default()
98 }
99 _ => Vec::new(),
100 }
101}
102
103fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
104 fn rank(v: &str) -> usize {
105 match v {
106 "none" => 0,
107 "read_only" => 1,
108 "workspace_write" => 2,
109 "process_exec" => 3,
110 "network" => 4,
111 _ => 5,
112 }
113 }
114 levels.max_by_key(|level| rank(level))
115}
116
117fn parse_tool_runtime_policy(
118 map: &serde_json::Map<String, serde_json::Value>,
119) -> ToolRuntimePolicyMetadata {
120 let Some(policy) = map.get("policy").and_then(|value| value.as_object()) else {
121 return ToolRuntimePolicyMetadata::default();
122 };
123
124 let capabilities = policy
125 .get("capabilities")
126 .and_then(|value| value.as_object())
127 .map(|caps| {
128 caps.iter()
129 .map(|(capability, ops)| {
130 let values = ops
131 .as_array()
132 .map(|items| {
133 items
134 .iter()
135 .filter_map(|item| item.as_str().map(|s| s.to_string()))
136 .collect::<Vec<_>>()
137 })
138 .unwrap_or_default();
139 (capability.clone(), values)
140 })
141 .collect::<BTreeMap<_, _>>()
142 })
143 .unwrap_or_default();
144
145 let path_params = policy
146 .get("path_params")
147 .and_then(|value| value.as_array())
148 .map(|items| {
149 items
150 .iter()
151 .filter_map(|item| item.as_str().map(|s| s.to_string()))
152 .collect::<Vec<_>>()
153 })
154 .unwrap_or_default();
155
156 ToolRuntimePolicyMetadata {
157 capabilities,
158 side_effect_level: policy
159 .get("side_effect_level")
160 .and_then(|value| value.as_str())
161 .map(|s| s.to_string()),
162 path_params,
163 mutation_classification: policy
164 .get("mutation_classification")
165 .and_then(|value| value.as_str())
166 .map(|s| s.to_string()),
167 }
168}
169
170pub fn workflow_tool_metadata(
171 value: &serde_json::Value,
172) -> BTreeMap<String, ToolRuntimePolicyMetadata> {
173 match value {
174 serde_json::Value::Null => BTreeMap::new(),
175 serde_json::Value::Array(items) => items
176 .iter()
177 .filter_map(|item| match item {
178 serde_json::Value::Object(map) => map
179 .get("name")
180 .and_then(|value| value.as_str())
181 .filter(|name| !name.is_empty())
182 .map(|name| (name.to_string(), parse_tool_runtime_policy(map))),
183 _ => None,
184 })
185 .collect(),
186 serde_json::Value::Object(map) => {
187 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
188 return map
189 .get("tools")
190 .map(workflow_tool_metadata)
191 .unwrap_or_default();
192 }
193 map.get("name")
194 .and_then(|value| value.as_str())
195 .filter(|name| !name.is_empty())
196 .map(|name| {
197 let mut metadata = BTreeMap::new();
198 metadata.insert(name.to_string(), parse_tool_runtime_policy(map));
199 metadata
200 })
201 .unwrap_or_default()
202 }
203 _ => BTreeMap::new(),
204 }
205}
206
207pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
208 let tools = workflow_tool_names(value);
209 let tool_metadata = workflow_tool_metadata(value);
210 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
211 for metadata in tool_metadata.values() {
212 for (capability, ops) in &metadata.capabilities {
213 let entry = capabilities.entry(capability.clone()).or_default();
214 for op in ops {
215 if !entry.contains(op) {
216 entry.push(op.clone());
217 }
218 }
219 entry.sort();
220 }
221 }
222 let side_effect_level = max_side_effect_level(
223 tool_metadata
224 .values()
225 .filter_map(|metadata| metadata.side_effect_level.clone()),
226 );
227 CapabilityPolicy {
228 tools,
229 capabilities,
230 workspace_roots: Vec::new(),
231 side_effect_level,
232 recursion_limit: None,
233 tool_arg_constraints: Vec::new(),
234 tool_metadata,
235 }
236}
237
238#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
239#[serde(default)]
240pub struct WorkflowEdge {
241 pub from: String,
242 pub to: String,
243 pub branch: Option<String>,
244 pub label: Option<String>,
245}
246
247#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
248#[serde(default)]
249pub struct WorkflowGraph {
250 #[serde(rename = "_type")]
251 pub type_name: String,
252 pub id: String,
253 pub name: Option<String>,
254 pub version: usize,
255 pub entry: String,
256 pub nodes: BTreeMap<String, WorkflowNode>,
257 pub edges: Vec<WorkflowEdge>,
258 pub capability_policy: CapabilityPolicy,
259 pub approval_policy: super::ToolApprovalPolicy,
260 pub metadata: BTreeMap<String, serde_json::Value>,
261 pub audit_log: Vec<WorkflowAuditEntry>,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
265#[serde(default)]
266pub struct WorkflowAuditEntry {
267 pub id: String,
268 pub op: String,
269 pub node_id: Option<String>,
270 pub timestamp: String,
271 pub reason: Option<String>,
272 pub metadata: BTreeMap<String, serde_json::Value>,
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct WorkflowValidationReport {
278 pub valid: bool,
279 pub errors: Vec<String>,
280 pub warnings: Vec<String>,
281 pub reachable_nodes: Vec<String>,
282}
283
284pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
285 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
286 let dict = value.as_dict();
287 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
288 node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
289 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
290 Ok(node)
291}
292
293pub fn parse_workflow_node_json(
294 json: serde_json::Value,
295 label: &str,
296) -> Result<WorkflowNode, VmError> {
297 super::parse_json_payload(json, label)
298}
299
300pub fn parse_workflow_edge_json(
301 json: serde_json::Value,
302 label: &str,
303) -> Result<WorkflowEdge, VmError> {
304 super::parse_json_payload(json, label)
305}
306
307pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
308 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
309 let as_dict = value.as_dict().cloned().unwrap_or_default();
310
311 if graph.nodes.is_empty() {
312 for key in ["act", "verify", "repair"] {
313 if let Some(node_value) = as_dict.get(key) {
314 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
315 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
316 node.id = Some(key.to_string());
317 if node.kind.is_empty() {
318 node.kind = if key == "verify" {
319 "verify".to_string()
320 } else {
321 "stage".to_string()
322 };
323 }
324 if node.model_policy.provider.is_none() {
325 node.model_policy.provider = as_dict
326 .get("provider")
327 .map(|value| value.display())
328 .filter(|value| !value.is_empty());
329 }
330 if node.model_policy.model.is_none() {
331 node.model_policy.model = as_dict
332 .get("model")
333 .map(|value| value.display())
334 .filter(|value| !value.is_empty());
335 }
336 if node.model_policy.model_tier.is_none() {
337 node.model_policy.model_tier = as_dict
338 .get("model_tier")
339 .or_else(|| as_dict.get("tier"))
340 .map(|value| value.display())
341 .filter(|value| !value.is_empty());
342 }
343 if node.model_policy.temperature.is_none() {
344 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
345 if let VmValue::Float(number) = value {
346 Some(*number)
347 } else {
348 value.as_int().map(|number| number as f64)
349 }
350 });
351 }
352 if node.model_policy.max_tokens.is_none() {
353 node.model_policy.max_tokens =
354 as_dict.get("max_tokens").and_then(|value| value.as_int());
355 }
356 if node.mode.is_none() {
357 node.mode = as_dict
358 .get("mode")
359 .map(|value| value.display())
360 .filter(|value| !value.is_empty());
361 }
362 if node.done_sentinel.is_none() {
363 node.done_sentinel = as_dict
364 .get("done_sentinel")
365 .map(|value| value.display())
366 .filter(|value| !value.is_empty());
367 }
368 if key == "verify"
369 && node.verify.is_none()
370 && (raw_node.contains_key("assert_text")
371 || raw_node.contains_key("command")
372 || raw_node.contains_key("expect_status")
373 || raw_node.contains_key("expect_text"))
374 {
375 node.verify = Some(serde_json::json!({
376 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
377 "command": raw_node.get("command").map(vm_value_to_json),
378 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
379 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
380 }));
381 }
382 graph.nodes.insert(key.to_string(), node);
383 }
384 }
385 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
386 graph.entry = "act".to_string();
387 }
388 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
389 if graph.nodes.contains_key("verify") {
390 graph.edges.push(WorkflowEdge {
391 from: "act".to_string(),
392 to: "verify".to_string(),
393 branch: None,
394 label: None,
395 });
396 }
397 if graph.nodes.contains_key("repair") {
398 graph.edges.push(WorkflowEdge {
399 from: "verify".to_string(),
400 to: "repair".to_string(),
401 branch: Some("failed".to_string()),
402 label: None,
403 });
404 graph.edges.push(WorkflowEdge {
405 from: "repair".to_string(),
406 to: "verify".to_string(),
407 branch: Some("retry".to_string()),
408 label: None,
409 });
410 }
411 }
412 }
413
414 if graph.type_name.is_empty() {
415 graph.type_name = "workflow_graph".to_string();
416 }
417 if graph.id.is_empty() {
418 graph.id = new_id("workflow");
419 }
420 if graph.version == 0 {
421 graph.version = 1;
422 }
423 if graph.entry.is_empty() {
424 graph.entry = graph
425 .nodes
426 .keys()
427 .next()
428 .cloned()
429 .unwrap_or_else(|| "act".to_string());
430 }
431 for (node_id, node) in &mut graph.nodes {
432 if node.raw_tools.is_none() {
433 node.raw_tools = as_dict
434 .get("nodes")
435 .and_then(|nodes| nodes.as_dict())
436 .and_then(|nodes| nodes.get(node_id))
437 .and_then(|node_value| node_value.as_dict())
438 .and_then(|raw_node| raw_node.get("tools"))
439 .cloned();
440 }
441 if node.id.is_none() {
442 node.id = Some(node_id.clone());
443 }
444 if node.kind.is_empty() {
445 node.kind = "stage".to_string();
446 }
447 if node.join_policy.strategy.is_empty() {
448 node.join_policy.strategy = "all".to_string();
449 }
450 if node.reduce_policy.strategy.is_empty() {
451 node.reduce_policy.strategy = "concat".to_string();
452 }
453 if node.output_contract.output_kinds.is_empty() {
454 node.output_contract.output_kinds = vec![match node.kind.as_str() {
455 "verify" => "verification_result".to_string(),
456 "reduce" => node
457 .reduce_policy
458 .output_kind
459 .clone()
460 .unwrap_or_else(|| "summary".to_string()),
461 "map" => node
462 .map_policy
463 .output_kind
464 .clone()
465 .unwrap_or_else(|| "artifact".to_string()),
466 "escalation" => "plan".to_string(),
467 _ => "artifact".to_string(),
468 }];
469 }
470 if node.retry_policy.max_attempts == 0 {
471 node.retry_policy.max_attempts = 1;
472 }
473 }
474 Ok(graph)
475}
476
477pub fn validate_workflow(
478 graph: &WorkflowGraph,
479 ceiling: Option<&CapabilityPolicy>,
480) -> WorkflowValidationReport {
481 let mut errors = Vec::new();
482 let mut warnings = Vec::new();
483
484 if !graph.nodes.contains_key(&graph.entry) {
485 errors.push(format!("entry node does not exist: {}", graph.entry));
486 }
487
488 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
489 for edge in &graph.edges {
490 if !node_ids.contains(&edge.from) {
491 errors.push(format!("edge.from references unknown node: {}", edge.from));
492 }
493 if !node_ids.contains(&edge.to) {
494 errors.push(format!("edge.to references unknown node: {}", edge.to));
495 }
496 }
497
498 let reachable_nodes = reachable_nodes(graph);
499 for node_id in &node_ids {
500 if !reachable_nodes.contains(node_id) {
501 warnings.push(format!("node is unreachable: {node_id}"));
502 }
503 }
504
505 for (node_id, node) in &graph.nodes {
506 let incoming = graph
507 .edges
508 .iter()
509 .filter(|edge| edge.to == *node_id)
510 .count();
511 let outgoing: Vec<&WorkflowEdge> = graph
512 .edges
513 .iter()
514 .filter(|edge| edge.from == *node_id)
515 .collect();
516 if let Some(min_inputs) = node.input_contract.min_inputs {
517 if let Some(max_inputs) = node.input_contract.max_inputs {
518 if min_inputs > max_inputs {
519 errors.push(format!(
520 "node {node_id}: input contract min_inputs exceeds max_inputs"
521 ));
522 }
523 }
524 }
525 match node.kind.as_str() {
526 "condition" => {
527 let has_true = outgoing
528 .iter()
529 .any(|edge| edge.branch.as_deref() == Some("true"));
530 let has_false = outgoing
531 .iter()
532 .any(|edge| edge.branch.as_deref() == Some("false"));
533 if !has_true || !has_false {
534 errors.push(format!(
535 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
536 ));
537 }
538 }
539 "fork" => {
540 if outgoing.len() < 2 {
541 errors.push(format!(
542 "node {node_id}: fork nodes require at least two outgoing edges"
543 ));
544 }
545 }
546 "join" => {
547 if incoming < 2 {
548 warnings.push(format!(
549 "node {node_id}: join node has fewer than two incoming edges"
550 ));
551 }
552 }
553 "map" => {
554 if node.map_policy.items.is_empty()
555 && node.map_policy.item_artifact_kind.is_none()
556 && node.input_contract.input_kinds.is_empty()
557 {
558 errors.push(format!(
559 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
560 ));
561 }
562 }
563 "reduce" => {
564 if node.input_contract.input_kinds.is_empty() {
565 warnings.push(format!(
566 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
567 ));
568 }
569 }
570 _ => {}
571 }
572 }
573
574 if let Some(ceiling) = ceiling {
575 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
576 errors.push(error);
577 }
578 for (node_id, node) in &graph.nodes {
579 if let Err(error) = ceiling.intersect(&node.capability_policy) {
580 errors.push(format!("node {node_id}: {error}"));
581 }
582 }
583 }
584
585 WorkflowValidationReport {
586 valid: errors.is_empty(),
587 errors,
588 warnings,
589 reachable_nodes: reachable_nodes.into_iter().collect(),
590 }
591}
592
593fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
594 let mut seen = BTreeSet::new();
595 let mut stack = vec![graph.entry.clone()];
596 while let Some(node_id) = stack.pop() {
597 if !seen.insert(node_id.clone()) {
598 continue;
599 }
600 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
601 stack.push(edge.to.clone());
602 }
603 }
604 seen
605}
606
607pub async fn execute_stage_node(
608 node_id: &str,
609 node: &WorkflowNode,
610 task: &str,
611 artifacts: &[ArtifactRecord],
612 transcript: Option<VmValue>,
613) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
614 let mut selection_policy = node.context_policy.clone();
615 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
616 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
617 }
618 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
619 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
620 let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
621 if node.input_contract.require_transcript && input_transcript.is_none() {
622 return Err(VmError::Runtime(format!(
623 "workflow stage {node_id} requires transcript input"
624 )));
625 }
626 if let Some(min_inputs) = node.input_contract.min_inputs {
627 if selected.len() < min_inputs {
628 return Err(VmError::Runtime(format!(
629 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
630 )));
631 }
632 }
633 if let Some(max_inputs) = node.input_contract.max_inputs {
634 if selected.len() > max_inputs {
635 return Err(VmError::Runtime(format!(
636 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
637 )));
638 }
639 }
640 let prompt = if rendered_context.is_empty() {
641 task.to_string()
642 } else {
643 format!(
644 "{rendered_context}\n\n{}:\n{task}",
645 node.task_label
646 .clone()
647 .unwrap_or_else(|| "Task".to_string())
648 )
649 };
650
651 let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
652 .ok()
653 .filter(|value| !value.trim().is_empty())
654 .unwrap_or_else(|| {
655 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
657 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
658 crate::llm_config::default_tool_format(&model, &provider)
659 });
660 let mut llm_result = if node.kind == "verify" {
661 if let Some(command) = node
662 .verify
663 .as_ref()
664 .and_then(|verify| verify.as_object())
665 .and_then(|verify| verify.get("command"))
666 .and_then(|value| value.as_str())
667 .map(str::trim)
668 .filter(|value| !value.is_empty())
669 {
670 let mut process = if cfg!(target_os = "windows") {
671 let mut cmd = tokio::process::Command::new("cmd");
672 cmd.arg("/C").arg(command);
673 cmd
674 } else {
675 let mut cmd = tokio::process::Command::new("/bin/sh");
676 cmd.arg("-lc").arg(command);
677 cmd
678 };
679 process.stdin(std::process::Stdio::null());
680 if let Some(context) = crate::stdlib::process::current_execution_context() {
681 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
682 process.current_dir(cwd);
683 }
684 if !context.env.is_empty() {
685 process.envs(context.env);
686 }
687 }
688 let output = process
689 .output()
690 .await
691 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
692 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
693 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
694 let combined = if stderr.is_empty() {
695 stdout.clone()
696 } else if stdout.is_empty() {
697 stderr.clone()
698 } else {
699 format!("{stdout}\n{stderr}")
700 };
701 serde_json::json!({
702 "status": "completed",
703 "text": combined,
704 "visible_text": combined,
705 "command": command,
706 "stdout": stdout,
707 "stderr": stderr,
708 "exit_status": output.status.code().unwrap_or(-1),
709 "success": output.status.success(),
710 })
711 } else {
712 serde_json::json!({
713 "status": "completed",
714 "text": "",
715 "visible_text": "",
716 })
717 }
718 } else {
719 let mut options = BTreeMap::new();
720 if let Some(provider) = &node.model_policy.provider {
721 options.insert(
722 "provider".to_string(),
723 VmValue::String(Rc::from(provider.clone())),
724 );
725 }
726 if let Some(model) = &node.model_policy.model {
727 options.insert(
728 "model".to_string(),
729 VmValue::String(Rc::from(model.clone())),
730 );
731 }
732 if let Some(model_tier) = &node.model_policy.model_tier {
733 options.insert(
734 "model_tier".to_string(),
735 VmValue::String(Rc::from(model_tier.clone())),
736 );
737 }
738 if let Some(temperature) = node.model_policy.temperature {
739 options.insert("temperature".to_string(), VmValue::Float(temperature));
740 }
741 if let Some(max_tokens) = node.model_policy.max_tokens {
742 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
743 }
744 let tool_names = workflow_tool_names(&node.tools);
745 let tools_value = node.raw_tools.clone().or_else(|| {
746 if matches!(node.tools, serde_json::Value::Null) {
747 None
748 } else {
749 Some(crate::stdlib::json_to_vm_value(&node.tools))
750 }
751 });
752 if tools_value.is_some() && !tool_names.is_empty() {
753 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
754 }
755 if let Some(transcript) = input_transcript.clone() {
756 options.insert("transcript".to_string(), transcript);
757 }
758
759 let args = vec![
760 VmValue::String(Rc::from(prompt.clone())),
761 node.system
762 .clone()
763 .map(|s| VmValue::String(Rc::from(s)))
764 .unwrap_or(VmValue::Nil),
765 VmValue::Dict(Rc::new(options)),
766 ];
767 let mut opts = extract_llm_options(&args)?;
768
769 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
770 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
771 let effective_policy = tool_policy
772 .intersect(&node.capability_policy)
773 .map_err(VmError::Runtime)?;
774 let auto_compact = if node.transcript_policy.auto_compact {
776 let mut ac = crate::orchestration::AutoCompactConfig::default();
777 if let Some(v) = node.transcript_policy.compact_threshold {
778 ac.token_threshold = v;
779 }
780 if let Some(v) = node.transcript_policy.tool_output_max_chars {
781 ac.tool_output_max_chars = v;
782 }
783 if let Some(ref strategy) = node.transcript_policy.compact_strategy {
784 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
785 ac.compact_strategy = s;
786 }
787 }
788 if let Some(v) = node.transcript_policy.hard_limit_tokens {
789 ac.hard_limit_tokens = Some(v);
790 }
791 if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
792 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
793 ac.hard_limit_strategy = s;
794 }
795 }
796 if let Some(ref raw_tp) = node.raw_transcript_policy {
798 if let Some(dict) = raw_tp.as_dict() {
799 if let Some(cb) = dict.get("compress_callback") {
800 ac.compress_callback = Some(cb.clone());
801 }
802 if let Some(cb) = dict.get("mask_callback") {
803 ac.mask_callback = Some(cb.clone());
804 }
805 }
806 }
807 {
809 let user_specified_threshold =
810 node.transcript_policy.compact_threshold.is_some();
811 let user_specified_hard_limit =
812 node.transcript_policy.hard_limit_tokens.is_some();
813 crate::llm::api::adapt_auto_compact_to_provider(
814 &mut ac,
815 user_specified_threshold,
816 user_specified_hard_limit,
817 &opts.provider,
818 &opts.model,
819 &opts.api_key,
820 )
821 .await;
822 }
823 Some(ac)
824 } else {
825 None
826 };
827 crate::llm::run_agent_loop_internal(
828 &mut opts,
829 crate::llm::AgentLoopConfig {
830 persistent: true,
831 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
832 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
833 nudge: node.model_policy.nudge.clone(),
834 done_sentinel: node.done_sentinel.clone(),
835 break_unless_phase: None,
836 tool_retries: 0,
837 tool_backoff_ms: 1000,
838 tool_format: tool_format.clone(),
839 auto_compact,
840 context_callback: None,
841 policy: Some(effective_policy),
842 approval_policy: Some(node.approval_policy.clone()),
843 daemon: false,
844 daemon_config: Default::default(),
845 llm_retries: 2,
846 llm_backoff_ms: 2000,
847 exit_when_verified: node.exit_when_verified,
848 loop_detect_warn: 2,
849 loop_detect_block: 3,
850 loop_detect_skip: 4,
851 tool_examples: node.model_policy.tool_examples.clone(),
852 post_turn_callback: node
854 .raw_model_policy
855 .as_ref()
856 .and_then(|v| v.as_dict())
857 .and_then(|d| d.get("post_turn_callback"))
858 .cloned(),
859 turn_policy: node.model_policy.turn_policy.clone(),
860 stop_after_successful_tools: node
861 .model_policy
862 .stop_after_successful_tools
863 .clone(),
864 require_successful_tools: node.model_policy.require_successful_tools.clone(),
865 on_tool_call: node
866 .raw_model_policy
867 .as_ref()
868 .and_then(|v| v.as_dict())
869 .and_then(|d| d.get("on_tool_call"))
870 .cloned(),
871 on_tool_result: node
872 .raw_model_policy
873 .as_ref()
874 .and_then(|v| v.as_dict())
875 .and_then(|d| d.get("on_tool_result"))
876 .cloned(),
877 },
878 )
879 .await?
880 } else {
881 let result = vm_call_llm_full(&opts).await?;
882 crate::llm::agent_loop_result_from_llm(&result, opts)
883 }
884 };
885 if let Some(payload) = llm_result.as_object_mut() {
886 payload.insert("prompt".to_string(), serde_json::json!(prompt));
887 payload.insert(
888 "system_prompt".to_string(),
889 serde_json::json!(node.system.clone().unwrap_or_default()),
890 );
891 payload.insert(
892 "rendered_context".to_string(),
893 serde_json::json!(rendered_context),
894 );
895 payload.insert(
896 "selected_artifact_ids".to_string(),
897 serde_json::json!(selected
898 .iter()
899 .map(|artifact| artifact.id.clone())
900 .collect::<Vec<_>>()),
901 );
902 payload.insert(
903 "selected_artifact_titles".to_string(),
904 serde_json::json!(selected
905 .iter()
906 .map(|artifact| artifact.title.clone())
907 .collect::<Vec<_>>()),
908 );
909 payload.insert(
910 "tool_calling_mode".to_string(),
911 serde_json::json!(tool_format.clone()),
912 );
913 }
914
915 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
916 let result_transcript = llm_result
920 .get("transcript")
921 .cloned()
922 .map(|value| crate::stdlib::json_to_vm_value(&value));
923 let transcript = apply_output_transcript_policy(
924 result_transcript.or(input_transcript),
925 &node.transcript_policy,
926 );
927 let output_kind = node
928 .output_contract
929 .output_kinds
930 .first()
931 .cloned()
932 .unwrap_or_else(|| {
933 if node.kind == "verify" {
934 "verification_result".to_string()
935 } else {
936 "artifact".to_string()
937 }
938 });
939 let mut metadata = BTreeMap::new();
940 metadata.insert(
941 "input_artifact_ids".to_string(),
942 serde_json::json!(selected
943 .iter()
944 .map(|artifact| artifact.id.clone())
945 .collect::<Vec<_>>()),
946 );
947 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
948 let artifact = ArtifactRecord {
949 type_name: "artifact".to_string(),
950 id: new_id("artifact"),
951 kind: output_kind,
952 title: Some(format!("stage {node_id} output")),
953 text: Some(visible_text),
954 data: Some(llm_result.clone()),
955 source: Some(node_id.to_string()),
956 created_at: now_rfc3339(),
957 freshness: Some("fresh".to_string()),
958 priority: None,
959 lineage: selected
960 .iter()
961 .map(|artifact| artifact.id.clone())
962 .collect(),
963 relevance: Some(1.0),
964 estimated_tokens: None,
965 stage: Some(node_id.to_string()),
966 metadata,
967 }
968 .normalize();
969
970 Ok((llm_result, vec![artifact], transcript))
971}
972
973pub fn next_nodes_for(
974 graph: &WorkflowGraph,
975 current: &str,
976 branch: Option<&str>,
977) -> Vec<WorkflowEdge> {
978 let mut matching: Vec<WorkflowEdge> = graph
979 .edges
980 .iter()
981 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
982 .cloned()
983 .collect();
984 if matching.is_empty() {
985 matching = graph
986 .edges
987 .iter()
988 .filter(|edge| edge.from == current && edge.branch.is_none())
989 .cloned()
990 .collect();
991 }
992 matching
993}
994
995pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
996 next_nodes_for(graph, current, Some(branch))
997 .into_iter()
998 .next()
999 .map(|edge| edge.to)
1000}
1001
1002pub fn append_audit_entry(
1003 graph: &mut WorkflowGraph,
1004 op: &str,
1005 node_id: Option<String>,
1006 reason: Option<String>,
1007 metadata: BTreeMap<String, serde_json::Value>,
1008) {
1009 graph.audit_log.push(WorkflowAuditEntry {
1010 id: new_id("audit"),
1011 op: op.to_string(),
1012 node_id,
1013 timestamp: now_rfc3339(),
1014 reason,
1015 metadata,
1016 });
1017}