1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10 ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11 MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, ToolRuntimePolicyMetadata,
12 TranscriptPolicy,
13};
14use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20 pub id: Option<String>,
21 pub kind: String,
22 pub mode: Option<String>,
23 pub prompt: Option<String>,
24 pub system: Option<String>,
25 pub task_label: Option<String>,
26 pub done_sentinel: Option<String>,
27 pub tools: serde_json::Value,
28 pub model_policy: ModelPolicy,
29 pub transcript_policy: TranscriptPolicy,
30 pub context_policy: ContextPolicy,
31 pub retry_policy: RetryPolicy,
32 pub capability_policy: CapabilityPolicy,
33 pub input_contract: StageContract,
34 pub output_contract: StageContract,
35 pub branch_semantics: BranchSemantics,
36 pub map_policy: MapPolicy,
37 pub join_policy: JoinPolicy,
38 pub reduce_policy: ReducePolicy,
39 pub escalation_policy: EscalationPolicy,
40 pub verify: Option<serde_json::Value>,
41 #[serde(default)]
45 pub timeout_ms: Option<u64>,
46 pub metadata: BTreeMap<String, serde_json::Value>,
47 #[serde(skip)]
48 pub raw_tools: Option<VmValue>,
49 #[serde(skip)]
52 pub raw_transcript_policy: Option<VmValue>,
53 #[serde(skip)]
56 pub raw_model_policy: Option<VmValue>,
57}
58
59impl PartialEq for WorkflowNode {
60 fn eq(&self, other: &Self) -> bool {
61 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
62 }
63}
64
65pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
66 match value {
67 serde_json::Value::Null => Vec::new(),
68 serde_json::Value::Array(items) => items
69 .iter()
70 .filter_map(|item| match item {
71 serde_json::Value::Object(map) => map
72 .get("name")
73 .and_then(|value| value.as_str())
74 .filter(|name| !name.is_empty())
75 .map(|name| name.to_string()),
76 _ => None,
77 })
78 .collect(),
79 serde_json::Value::Object(map) => {
80 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
81 return map
82 .get("tools")
83 .map(workflow_tool_names)
84 .unwrap_or_default();
85 }
86 map.get("name")
87 .and_then(|value| value.as_str())
88 .filter(|name| !name.is_empty())
89 .map(|name| vec![name.to_string()])
90 .unwrap_or_default()
91 }
92 _ => Vec::new(),
93 }
94}
95
96fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
97 fn rank(v: &str) -> usize {
98 match v {
99 "none" => 0,
100 "read_only" => 1,
101 "workspace_write" => 2,
102 "process_exec" => 3,
103 "network" => 4,
104 _ => 5,
105 }
106 }
107 levels.max_by_key(|level| rank(level))
108}
109
110fn parse_tool_runtime_policy(
111 map: &serde_json::Map<String, serde_json::Value>,
112) -> ToolRuntimePolicyMetadata {
113 let Some(policy) = map.get("policy").and_then(|value| value.as_object()) else {
114 return ToolRuntimePolicyMetadata::default();
115 };
116
117 let capabilities = policy
118 .get("capabilities")
119 .and_then(|value| value.as_object())
120 .map(|caps| {
121 caps.iter()
122 .map(|(capability, ops)| {
123 let values = ops
124 .as_array()
125 .map(|items| {
126 items
127 .iter()
128 .filter_map(|item| item.as_str().map(|s| s.to_string()))
129 .collect::<Vec<_>>()
130 })
131 .unwrap_or_default();
132 (capability.clone(), values)
133 })
134 .collect::<BTreeMap<_, _>>()
135 })
136 .unwrap_or_default();
137
138 let path_params = policy
139 .get("path_params")
140 .and_then(|value| value.as_array())
141 .map(|items| {
142 items
143 .iter()
144 .filter_map(|item| item.as_str().map(|s| s.to_string()))
145 .collect::<Vec<_>>()
146 })
147 .unwrap_or_default();
148
149 ToolRuntimePolicyMetadata {
150 capabilities,
151 side_effect_level: policy
152 .get("side_effect_level")
153 .and_then(|value| value.as_str())
154 .map(|s| s.to_string()),
155 path_params,
156 mutation_classification: policy
157 .get("mutation_classification")
158 .and_then(|value| value.as_str())
159 .map(|s| s.to_string()),
160 }
161}
162
163pub fn workflow_tool_metadata(
164 value: &serde_json::Value,
165) -> BTreeMap<String, ToolRuntimePolicyMetadata> {
166 match value {
167 serde_json::Value::Null => BTreeMap::new(),
168 serde_json::Value::Array(items) => items
169 .iter()
170 .filter_map(|item| match item {
171 serde_json::Value::Object(map) => map
172 .get("name")
173 .and_then(|value| value.as_str())
174 .filter(|name| !name.is_empty())
175 .map(|name| (name.to_string(), parse_tool_runtime_policy(map))),
176 _ => None,
177 })
178 .collect(),
179 serde_json::Value::Object(map) => {
180 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
181 return map
182 .get("tools")
183 .map(workflow_tool_metadata)
184 .unwrap_or_default();
185 }
186 map.get("name")
187 .and_then(|value| value.as_str())
188 .filter(|name| !name.is_empty())
189 .map(|name| {
190 let mut metadata = BTreeMap::new();
191 metadata.insert(name.to_string(), parse_tool_runtime_policy(map));
192 metadata
193 })
194 .unwrap_or_default()
195 }
196 _ => BTreeMap::new(),
197 }
198}
199
200pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
201 let tools = workflow_tool_names(value);
202 let tool_metadata = workflow_tool_metadata(value);
203 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
204 for metadata in tool_metadata.values() {
205 for (capability, ops) in &metadata.capabilities {
206 let entry = capabilities.entry(capability.clone()).or_default();
207 for op in ops {
208 if !entry.contains(op) {
209 entry.push(op.clone());
210 }
211 }
212 entry.sort();
213 }
214 }
215 let side_effect_level = max_side_effect_level(
216 tool_metadata
217 .values()
218 .filter_map(|metadata| metadata.side_effect_level.clone()),
219 );
220 CapabilityPolicy {
221 tools,
222 capabilities,
223 workspace_roots: Vec::new(),
224 side_effect_level,
225 recursion_limit: None,
226 tool_arg_constraints: Vec::new(),
227 tool_metadata,
228 }
229}
230
231#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
232#[serde(default)]
233pub struct WorkflowEdge {
234 pub from: String,
235 pub to: String,
236 pub branch: Option<String>,
237 pub label: Option<String>,
238}
239
240#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
241#[serde(default)]
242pub struct WorkflowGraph {
243 #[serde(rename = "_type")]
244 pub type_name: String,
245 pub id: String,
246 pub name: Option<String>,
247 pub version: usize,
248 pub entry: String,
249 pub nodes: BTreeMap<String, WorkflowNode>,
250 pub edges: Vec<WorkflowEdge>,
251 pub capability_policy: CapabilityPolicy,
252 pub metadata: BTreeMap<String, serde_json::Value>,
253 pub audit_log: Vec<WorkflowAuditEntry>,
254}
255
256#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
257#[serde(default)]
258pub struct WorkflowAuditEntry {
259 pub id: String,
260 pub op: String,
261 pub node_id: Option<String>,
262 pub timestamp: String,
263 pub reason: Option<String>,
264 pub metadata: BTreeMap<String, serde_json::Value>,
265}
266
267#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
268#[serde(default)]
269pub struct WorkflowValidationReport {
270 pub valid: bool,
271 pub errors: Vec<String>,
272 pub warnings: Vec<String>,
273 pub reachable_nodes: Vec<String>,
274}
275
276pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
277 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
278 let dict = value.as_dict();
279 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
280 node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
281 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
282 Ok(node)
283}
284
285pub fn parse_workflow_node_json(
286 json: serde_json::Value,
287 label: &str,
288) -> Result<WorkflowNode, VmError> {
289 super::parse_json_payload(json, label)
290}
291
292pub fn parse_workflow_edge_json(
293 json: serde_json::Value,
294 label: &str,
295) -> Result<WorkflowEdge, VmError> {
296 super::parse_json_payload(json, label)
297}
298
299pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
300 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
301 let as_dict = value.as_dict().cloned().unwrap_or_default();
302
303 if graph.nodes.is_empty() {
304 for key in ["act", "verify", "repair"] {
305 if let Some(node_value) = as_dict.get(key) {
306 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
307 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
308 node.id = Some(key.to_string());
309 if node.kind.is_empty() {
310 node.kind = if key == "verify" {
311 "verify".to_string()
312 } else {
313 "stage".to_string()
314 };
315 }
316 if node.model_policy.provider.is_none() {
317 node.model_policy.provider = as_dict
318 .get("provider")
319 .map(|value| value.display())
320 .filter(|value| !value.is_empty());
321 }
322 if node.model_policy.model.is_none() {
323 node.model_policy.model = as_dict
324 .get("model")
325 .map(|value| value.display())
326 .filter(|value| !value.is_empty());
327 }
328 if node.model_policy.model_tier.is_none() {
329 node.model_policy.model_tier = as_dict
330 .get("model_tier")
331 .or_else(|| as_dict.get("tier"))
332 .map(|value| value.display())
333 .filter(|value| !value.is_empty());
334 }
335 if node.model_policy.temperature.is_none() {
336 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
337 if let VmValue::Float(number) = value {
338 Some(*number)
339 } else {
340 value.as_int().map(|number| number as f64)
341 }
342 });
343 }
344 if node.model_policy.max_tokens.is_none() {
345 node.model_policy.max_tokens =
346 as_dict.get("max_tokens").and_then(|value| value.as_int());
347 }
348 if node.mode.is_none() {
349 node.mode = as_dict
350 .get("mode")
351 .map(|value| value.display())
352 .filter(|value| !value.is_empty());
353 }
354 if node.done_sentinel.is_none() {
355 node.done_sentinel = as_dict
356 .get("done_sentinel")
357 .map(|value| value.display())
358 .filter(|value| !value.is_empty());
359 }
360 if key == "verify"
361 && node.verify.is_none()
362 && (raw_node.contains_key("assert_text")
363 || raw_node.contains_key("command")
364 || raw_node.contains_key("expect_status")
365 || raw_node.contains_key("expect_text"))
366 {
367 node.verify = Some(serde_json::json!({
368 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
369 "command": raw_node.get("command").map(vm_value_to_json),
370 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
371 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
372 }));
373 }
374 graph.nodes.insert(key.to_string(), node);
375 }
376 }
377 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
378 graph.entry = "act".to_string();
379 }
380 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
381 if graph.nodes.contains_key("verify") {
382 graph.edges.push(WorkflowEdge {
383 from: "act".to_string(),
384 to: "verify".to_string(),
385 branch: None,
386 label: None,
387 });
388 }
389 if graph.nodes.contains_key("repair") {
390 graph.edges.push(WorkflowEdge {
391 from: "verify".to_string(),
392 to: "repair".to_string(),
393 branch: Some("failed".to_string()),
394 label: None,
395 });
396 graph.edges.push(WorkflowEdge {
397 from: "repair".to_string(),
398 to: "verify".to_string(),
399 branch: Some("retry".to_string()),
400 label: None,
401 });
402 }
403 }
404 }
405
406 if graph.type_name.is_empty() {
407 graph.type_name = "workflow_graph".to_string();
408 }
409 if graph.id.is_empty() {
410 graph.id = new_id("workflow");
411 }
412 if graph.version == 0 {
413 graph.version = 1;
414 }
415 if graph.entry.is_empty() {
416 graph.entry = graph
417 .nodes
418 .keys()
419 .next()
420 .cloned()
421 .unwrap_or_else(|| "act".to_string());
422 }
423 for (node_id, node) in &mut graph.nodes {
424 if node.raw_tools.is_none() {
425 node.raw_tools = as_dict
426 .get("nodes")
427 .and_then(|nodes| nodes.as_dict())
428 .and_then(|nodes| nodes.get(node_id))
429 .and_then(|node_value| node_value.as_dict())
430 .and_then(|raw_node| raw_node.get("tools"))
431 .cloned();
432 }
433 if node.id.is_none() {
434 node.id = Some(node_id.clone());
435 }
436 if node.kind.is_empty() {
437 node.kind = "stage".to_string();
438 }
439 if node.join_policy.strategy.is_empty() {
440 node.join_policy.strategy = "all".to_string();
441 }
442 if node.reduce_policy.strategy.is_empty() {
443 node.reduce_policy.strategy = "concat".to_string();
444 }
445 if node.output_contract.output_kinds.is_empty() {
446 node.output_contract.output_kinds = vec![match node.kind.as_str() {
447 "verify" => "verification_result".to_string(),
448 "reduce" => node
449 .reduce_policy
450 .output_kind
451 .clone()
452 .unwrap_or_else(|| "summary".to_string()),
453 "map" => node
454 .map_policy
455 .output_kind
456 .clone()
457 .unwrap_or_else(|| "artifact".to_string()),
458 "escalation" => "plan".to_string(),
459 _ => "artifact".to_string(),
460 }];
461 }
462 if node.retry_policy.max_attempts == 0 {
463 node.retry_policy.max_attempts = 1;
464 }
465 }
466 Ok(graph)
467}
468
469pub fn validate_workflow(
470 graph: &WorkflowGraph,
471 ceiling: Option<&CapabilityPolicy>,
472) -> WorkflowValidationReport {
473 let mut errors = Vec::new();
474 let mut warnings = Vec::new();
475
476 if !graph.nodes.contains_key(&graph.entry) {
477 errors.push(format!("entry node does not exist: {}", graph.entry));
478 }
479
480 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
481 for edge in &graph.edges {
482 if !node_ids.contains(&edge.from) {
483 errors.push(format!("edge.from references unknown node: {}", edge.from));
484 }
485 if !node_ids.contains(&edge.to) {
486 errors.push(format!("edge.to references unknown node: {}", edge.to));
487 }
488 }
489
490 let reachable_nodes = reachable_nodes(graph);
491 for node_id in &node_ids {
492 if !reachable_nodes.contains(node_id) {
493 warnings.push(format!("node is unreachable: {node_id}"));
494 }
495 }
496
497 for (node_id, node) in &graph.nodes {
498 let incoming = graph
499 .edges
500 .iter()
501 .filter(|edge| edge.to == *node_id)
502 .count();
503 let outgoing: Vec<&WorkflowEdge> = graph
504 .edges
505 .iter()
506 .filter(|edge| edge.from == *node_id)
507 .collect();
508 if let Some(min_inputs) = node.input_contract.min_inputs {
509 if let Some(max_inputs) = node.input_contract.max_inputs {
510 if min_inputs > max_inputs {
511 errors.push(format!(
512 "node {node_id}: input contract min_inputs exceeds max_inputs"
513 ));
514 }
515 }
516 }
517 match node.kind.as_str() {
518 "condition" => {
519 let has_true = outgoing
520 .iter()
521 .any(|edge| edge.branch.as_deref() == Some("true"));
522 let has_false = outgoing
523 .iter()
524 .any(|edge| edge.branch.as_deref() == Some("false"));
525 if !has_true || !has_false {
526 errors.push(format!(
527 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
528 ));
529 }
530 }
531 "fork" => {
532 if outgoing.len() < 2 {
533 errors.push(format!(
534 "node {node_id}: fork nodes require at least two outgoing edges"
535 ));
536 }
537 }
538 "join" => {
539 if incoming < 2 {
540 warnings.push(format!(
541 "node {node_id}: join node has fewer than two incoming edges"
542 ));
543 }
544 }
545 "map" => {
546 if node.map_policy.items.is_empty()
547 && node.map_policy.item_artifact_kind.is_none()
548 && node.input_contract.input_kinds.is_empty()
549 {
550 errors.push(format!(
551 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
552 ));
553 }
554 }
555 "reduce" => {
556 if node.input_contract.input_kinds.is_empty() {
557 warnings.push(format!(
558 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
559 ));
560 }
561 }
562 _ => {}
563 }
564 }
565
566 if let Some(ceiling) = ceiling {
567 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
568 errors.push(error);
569 }
570 for (node_id, node) in &graph.nodes {
571 if let Err(error) = ceiling.intersect(&node.capability_policy) {
572 errors.push(format!("node {node_id}: {error}"));
573 }
574 }
575 }
576
577 WorkflowValidationReport {
578 valid: errors.is_empty(),
579 errors,
580 warnings,
581 reachable_nodes: reachable_nodes.into_iter().collect(),
582 }
583}
584
585fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
586 let mut seen = BTreeSet::new();
587 let mut stack = vec![graph.entry.clone()];
588 while let Some(node_id) = stack.pop() {
589 if !seen.insert(node_id.clone()) {
590 continue;
591 }
592 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
593 stack.push(edge.to.clone());
594 }
595 }
596 seen
597}
598
599pub async fn execute_stage_node(
600 node_id: &str,
601 node: &WorkflowNode,
602 task: &str,
603 artifacts: &[ArtifactRecord],
604 transcript: Option<VmValue>,
605) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
606 let mut selection_policy = node.context_policy.clone();
607 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
608 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
609 }
610 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
611 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
612 let transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
613 if node.input_contract.require_transcript && transcript.is_none() {
614 return Err(VmError::Runtime(format!(
615 "workflow stage {node_id} requires transcript input"
616 )));
617 }
618 if let Some(min_inputs) = node.input_contract.min_inputs {
619 if selected.len() < min_inputs {
620 return Err(VmError::Runtime(format!(
621 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
622 )));
623 }
624 }
625 if let Some(max_inputs) = node.input_contract.max_inputs {
626 if selected.len() > max_inputs {
627 return Err(VmError::Runtime(format!(
628 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
629 )));
630 }
631 }
632 let prompt = if rendered_context.is_empty() {
633 task.to_string()
634 } else {
635 format!(
636 "{rendered_context}\n\n{}:\n{task}",
637 node.task_label
638 .clone()
639 .unwrap_or_else(|| "Task".to_string())
640 )
641 };
642
643 let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
644 .ok()
645 .filter(|value| !value.trim().is_empty())
646 .unwrap_or_else(|| {
647 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
649 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
650 crate::llm_config::default_tool_format(&model, &provider)
651 });
652 let mut llm_result = if node.kind == "verify" {
653 if let Some(command) = node
654 .verify
655 .as_ref()
656 .and_then(|verify| verify.as_object())
657 .and_then(|verify| verify.get("command"))
658 .and_then(|value| value.as_str())
659 .map(str::trim)
660 .filter(|value| !value.is_empty())
661 {
662 let mut process = if cfg!(target_os = "windows") {
663 let mut cmd = tokio::process::Command::new("cmd");
664 cmd.arg("/C").arg(command);
665 cmd
666 } else {
667 let mut cmd = tokio::process::Command::new("/bin/sh");
668 cmd.arg("-lc").arg(command);
669 cmd
670 };
671 process.stdin(std::process::Stdio::null());
672 if let Some(context) = crate::stdlib::process::current_execution_context() {
673 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
674 process.current_dir(cwd);
675 }
676 if !context.env.is_empty() {
677 process.envs(context.env);
678 }
679 }
680 let output = process
681 .output()
682 .await
683 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
684 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
685 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
686 let combined = if stderr.is_empty() {
687 stdout.clone()
688 } else if stdout.is_empty() {
689 stderr.clone()
690 } else {
691 format!("{stdout}\n{stderr}")
692 };
693 serde_json::json!({
694 "status": "completed",
695 "text": combined,
696 "visible_text": combined,
697 "command": command,
698 "stdout": stdout,
699 "stderr": stderr,
700 "exit_status": output.status.code().unwrap_or(-1),
701 "success": output.status.success(),
702 })
703 } else {
704 serde_json::json!({
705 "status": "completed",
706 "text": "",
707 "visible_text": "",
708 })
709 }
710 } else {
711 let mut options = BTreeMap::new();
712 if let Some(provider) = &node.model_policy.provider {
713 options.insert(
714 "provider".to_string(),
715 VmValue::String(Rc::from(provider.clone())),
716 );
717 }
718 if let Some(model) = &node.model_policy.model {
719 options.insert(
720 "model".to_string(),
721 VmValue::String(Rc::from(model.clone())),
722 );
723 }
724 if let Some(model_tier) = &node.model_policy.model_tier {
725 options.insert(
726 "model_tier".to_string(),
727 VmValue::String(Rc::from(model_tier.clone())),
728 );
729 }
730 if let Some(temperature) = node.model_policy.temperature {
731 options.insert("temperature".to_string(), VmValue::Float(temperature));
732 }
733 if let Some(max_tokens) = node.model_policy.max_tokens {
734 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
735 }
736 let tool_names = workflow_tool_names(&node.tools);
737 let tools_value = node.raw_tools.clone().or_else(|| {
738 if matches!(node.tools, serde_json::Value::Null) {
739 None
740 } else {
741 Some(crate::stdlib::json_to_vm_value(&node.tools))
742 }
743 });
744 if tools_value.is_some() && !tool_names.is_empty() {
745 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
746 }
747 if let Some(transcript) = transcript.clone() {
748 options.insert("transcript".to_string(), transcript);
749 }
750
751 let args = vec![
752 VmValue::String(Rc::from(prompt.clone())),
753 node.system
754 .clone()
755 .map(|s| VmValue::String(Rc::from(s)))
756 .unwrap_or(VmValue::Nil),
757 VmValue::Dict(Rc::new(options)),
758 ];
759 let mut opts = extract_llm_options(&args)?;
760
761 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
762 let auto_compact = if node.transcript_policy.auto_compact {
764 let mut ac = crate::orchestration::AutoCompactConfig::default();
765 if let Some(v) = node.transcript_policy.compact_threshold {
766 ac.token_threshold = v;
767 }
768 if let Some(v) = node.transcript_policy.tool_output_max_chars {
769 ac.tool_output_max_chars = v;
770 }
771 if let Some(ref strategy) = node.transcript_policy.compact_strategy {
772 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
773 ac.compact_strategy = s;
774 }
775 }
776 if let Some(v) = node.transcript_policy.hard_limit_tokens {
777 ac.hard_limit_tokens = Some(v);
778 }
779 if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
780 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
781 ac.hard_limit_strategy = s;
782 }
783 }
784 if let Some(ref raw_tp) = node.raw_transcript_policy {
786 if let Some(dict) = raw_tp.as_dict() {
787 if let Some(cb) = dict.get("compress_callback") {
788 ac.compress_callback = Some(cb.clone());
789 }
790 if let Some(cb) = dict.get("mask_callback") {
791 ac.mask_callback = Some(cb.clone());
792 }
793 }
794 }
795 {
797 let user_specified_threshold =
798 node.transcript_policy.compact_threshold.is_some();
799 let user_specified_hard_limit =
800 node.transcript_policy.hard_limit_tokens.is_some();
801 crate::llm::api::adapt_auto_compact_to_provider(
802 &mut ac,
803 user_specified_threshold,
804 user_specified_hard_limit,
805 &opts.provider,
806 &opts.model,
807 &opts.api_key,
808 )
809 .await;
810 }
811 Some(ac)
812 } else {
813 None
814 };
815 crate::llm::run_agent_loop_internal(
816 &mut opts,
817 crate::llm::AgentLoopConfig {
818 persistent: true,
819 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
820 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
821 nudge: node.model_policy.nudge.clone(),
822 done_sentinel: node.done_sentinel.clone(),
823 break_unless_phase: None,
824 tool_retries: 0,
825 tool_backoff_ms: 1000,
826 tool_format: tool_format.clone(),
827 auto_compact,
828 context_callback: None,
829 policy: None,
830 daemon: false,
831 daemon_config: Default::default(),
832 llm_retries: 2,
833 llm_backoff_ms: 2000,
834 exit_when_verified: false,
835 loop_detect_warn: 2,
836 loop_detect_block: 3,
837 loop_detect_skip: 4,
838 tool_examples: node.model_policy.tool_examples.clone(),
839 post_turn_callback: node
841 .raw_model_policy
842 .as_ref()
843 .and_then(|v| v.as_dict())
844 .and_then(|d| d.get("post_turn_callback"))
845 .cloned(),
846 turn_policy: node.model_policy.turn_policy.clone(),
847 stop_after_successful_tools: node
848 .model_policy
849 .stop_after_successful_tools
850 .clone(),
851 on_tool_call: node
852 .raw_model_policy
853 .as_ref()
854 .and_then(|v| v.as_dict())
855 .and_then(|d| d.get("on_tool_call"))
856 .cloned(),
857 on_tool_result: node
858 .raw_model_policy
859 .as_ref()
860 .and_then(|v| v.as_dict())
861 .and_then(|d| d.get("on_tool_result"))
862 .cloned(),
863 },
864 )
865 .await?
866 } else {
867 let result = vm_call_llm_full(&opts).await?;
868 crate::llm::agent_loop_result_from_llm(&result, opts)
869 }
870 };
871 if let Some(payload) = llm_result.as_object_mut() {
872 payload.insert("prompt".to_string(), serde_json::json!(prompt));
873 payload.insert(
874 "system_prompt".to_string(),
875 serde_json::json!(node.system.clone().unwrap_or_default()),
876 );
877 payload.insert(
878 "rendered_context".to_string(),
879 serde_json::json!(rendered_context),
880 );
881 payload.insert(
882 "selected_artifact_ids".to_string(),
883 serde_json::json!(selected
884 .iter()
885 .map(|artifact| artifact.id.clone())
886 .collect::<Vec<_>>()),
887 );
888 payload.insert(
889 "selected_artifact_titles".to_string(),
890 serde_json::json!(selected
891 .iter()
892 .map(|artifact| artifact.title.clone())
893 .collect::<Vec<_>>()),
894 );
895 payload.insert(
896 "tool_calling_mode".to_string(),
897 serde_json::json!(tool_format.clone()),
898 );
899 }
900
901 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
902 let transcript = llm_result
903 .get("transcript")
904 .cloned()
905 .map(|value| crate::stdlib::json_to_vm_value(&value));
906 let transcript = apply_output_transcript_policy(transcript, &node.transcript_policy);
907 let output_kind = node
908 .output_contract
909 .output_kinds
910 .first()
911 .cloned()
912 .unwrap_or_else(|| {
913 if node.kind == "verify" {
914 "verification_result".to_string()
915 } else {
916 "artifact".to_string()
917 }
918 });
919 let mut metadata = BTreeMap::new();
920 metadata.insert(
921 "input_artifact_ids".to_string(),
922 serde_json::json!(selected
923 .iter()
924 .map(|artifact| artifact.id.clone())
925 .collect::<Vec<_>>()),
926 );
927 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
928 let artifact = ArtifactRecord {
929 type_name: "artifact".to_string(),
930 id: new_id("artifact"),
931 kind: output_kind,
932 title: Some(format!("stage {node_id} output")),
933 text: Some(visible_text),
934 data: Some(llm_result.clone()),
935 source: Some(node_id.to_string()),
936 created_at: now_rfc3339(),
937 freshness: Some("fresh".to_string()),
938 priority: None,
939 lineage: selected
940 .iter()
941 .map(|artifact| artifact.id.clone())
942 .collect(),
943 relevance: Some(1.0),
944 estimated_tokens: None,
945 stage: Some(node_id.to_string()),
946 metadata,
947 }
948 .normalize();
949
950 Ok((llm_result, vec![artifact], transcript))
951}
952
953pub fn next_nodes_for(
954 graph: &WorkflowGraph,
955 current: &str,
956 branch: Option<&str>,
957) -> Vec<WorkflowEdge> {
958 let mut matching: Vec<WorkflowEdge> = graph
959 .edges
960 .iter()
961 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
962 .cloned()
963 .collect();
964 if matching.is_empty() {
965 matching = graph
966 .edges
967 .iter()
968 .filter(|edge| edge.from == current && edge.branch.is_none())
969 .cloned()
970 .collect();
971 }
972 matching
973}
974
975pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
976 next_nodes_for(graph, current, Some(branch))
977 .into_iter()
978 .next()
979 .map(|edge| edge.to)
980}
981
982pub fn append_audit_entry(
983 graph: &mut WorkflowGraph,
984 op: &str,
985 node_id: Option<String>,
986 reason: Option<String>,
987 metadata: BTreeMap<String, serde_json::Value>,
988) {
989 graph.audit_log.push(WorkflowAuditEntry {
990 id: new_id("audit"),
991 op: op.to_string(),
992 node_id,
993 timestamp: now_rfc3339(),
994 reason,
995 metadata,
996 });
997}