1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10 ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11 MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, TranscriptPolicy,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20 pub id: Option<String>,
21 pub kind: String,
22 pub mode: Option<String>,
23 pub prompt: Option<String>,
24 pub system: Option<String>,
25 pub task_label: Option<String>,
26 pub done_sentinel: Option<String>,
27 pub tools: serde_json::Value,
28 pub model_policy: ModelPolicy,
29 pub transcript_policy: TranscriptPolicy,
30 pub context_policy: ContextPolicy,
31 pub retry_policy: RetryPolicy,
32 pub capability_policy: CapabilityPolicy,
33 pub approval_policy: super::ToolApprovalPolicy,
34 pub input_contract: StageContract,
35 pub output_contract: StageContract,
36 pub branch_semantics: BranchSemantics,
37 pub map_policy: MapPolicy,
38 pub join_policy: JoinPolicy,
39 pub reduce_policy: ReducePolicy,
40 pub escalation_policy: EscalationPolicy,
41 pub verify: Option<serde_json::Value>,
42 #[serde(default)]
47 pub exit_when_verified: bool,
48 pub metadata: BTreeMap<String, serde_json::Value>,
49 #[serde(skip)]
50 pub raw_tools: Option<VmValue>,
51 #[serde(skip)]
54 pub raw_transcript_policy: Option<VmValue>,
55 #[serde(skip)]
58 pub raw_model_policy: Option<VmValue>,
59}
60
61impl PartialEq for WorkflowNode {
62 fn eq(&self, other: &Self) -> bool {
63 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
64 }
65}
66
67pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
68 match value {
69 serde_json::Value::Null => Vec::new(),
70 serde_json::Value::Array(items) => items
71 .iter()
72 .filter_map(|item| match item {
73 serde_json::Value::Object(map) => map
74 .get("name")
75 .and_then(|value| value.as_str())
76 .filter(|name| !name.is_empty())
77 .map(|name| name.to_string()),
78 _ => None,
79 })
80 .collect(),
81 serde_json::Value::Object(map) => {
82 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
83 return map
84 .get("tools")
85 .map(workflow_tool_names)
86 .unwrap_or_default();
87 }
88 map.get("name")
89 .and_then(|value| value.as_str())
90 .filter(|name| !name.is_empty())
91 .map(|name| vec![name.to_string()])
92 .unwrap_or_default()
93 }
94 _ => Vec::new(),
95 }
96}
97
98fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
99 fn rank(v: &str) -> usize {
100 match v {
101 "none" => 0,
102 "read_only" => 1,
103 "workspace_write" => 2,
104 "process_exec" => 3,
105 "network" => 4,
106 _ => 5,
107 }
108 }
109 levels.max_by_key(|level| rank(level))
110}
111
112fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
113 match value.and_then(|v| v.as_str()).unwrap_or("") {
114 "read" => ToolKind::Read,
115 "edit" => ToolKind::Edit,
116 "delete" => ToolKind::Delete,
117 "move" => ToolKind::Move,
118 "search" => ToolKind::Search,
119 "execute" => ToolKind::Execute,
120 "think" => ToolKind::Think,
121 "fetch" => ToolKind::Fetch,
122 _ => ToolKind::Other,
123 }
124}
125
126fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
127 let policy = map
128 .get("policy")
129 .and_then(|value| value.as_object())
130 .cloned()
131 .unwrap_or_default();
132
133 let capabilities = policy
134 .get("capabilities")
135 .and_then(|value| value.as_object())
136 .map(|caps| {
137 caps.iter()
138 .map(|(capability, ops)| {
139 let values = ops
140 .as_array()
141 .map(|items| {
142 items
143 .iter()
144 .filter_map(|item| item.as_str().map(|s| s.to_string()))
145 .collect::<Vec<_>>()
146 })
147 .unwrap_or_default();
148 (capability.clone(), values)
149 })
150 .collect::<BTreeMap<_, _>>()
151 })
152 .unwrap_or_default();
153
154 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
157 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
158 } else {
159 ToolArgSchema {
160 path_params: policy
161 .get("path_params")
162 .and_then(|value| value.as_array())
163 .map(|items| {
164 items
165 .iter()
166 .filter_map(|item| item.as_str().map(|s| s.to_string()))
167 .collect::<Vec<_>>()
168 })
169 .unwrap_or_default(),
170 arg_aliases: policy
171 .get("arg_aliases")
172 .and_then(|value| value.as_object())
173 .map(|aliases| {
174 aliases
175 .iter()
176 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
177 .collect::<BTreeMap<_, _>>()
178 })
179 .unwrap_or_default(),
180 required: policy
181 .get("required")
182 .and_then(|value| value.as_array())
183 .map(|items| {
184 items
185 .iter()
186 .filter_map(|item| item.as_str().map(|s| s.to_string()))
187 .collect::<Vec<_>>()
188 })
189 .unwrap_or_default(),
190 }
191 };
192
193 let kind = parse_tool_kind(policy.get("kind"));
194 let side_effect_level = policy
195 .get("side_effect_level")
196 .and_then(|value| value.as_str())
197 .map(SideEffectLevel::parse)
198 .unwrap_or_default();
199
200 ToolAnnotations {
201 kind,
202 side_effect_level,
203 arg_schema,
204 capabilities,
205 }
206}
207
208pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
209 match value {
210 serde_json::Value::Null => BTreeMap::new(),
211 serde_json::Value::Array(items) => items
212 .iter()
213 .filter_map(|item| match item {
214 serde_json::Value::Object(map) => map
215 .get("name")
216 .and_then(|value| value.as_str())
217 .filter(|name| !name.is_empty())
218 .map(|name| (name.to_string(), parse_tool_annotations(map))),
219 _ => None,
220 })
221 .collect(),
222 serde_json::Value::Object(map) => {
223 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
224 return map
225 .get("tools")
226 .map(workflow_tool_annotations)
227 .unwrap_or_default();
228 }
229 map.get("name")
230 .and_then(|value| value.as_str())
231 .filter(|name| !name.is_empty())
232 .map(|name| {
233 let mut annotations = BTreeMap::new();
234 annotations.insert(name.to_string(), parse_tool_annotations(map));
235 annotations
236 })
237 .unwrap_or_default()
238 }
239 _ => BTreeMap::new(),
240 }
241}
242
243pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
244 let tools = workflow_tool_names(value);
245 let tool_annotations = workflow_tool_annotations(value);
246 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
247 for annotations in tool_annotations.values() {
248 for (capability, ops) in &annotations.capabilities {
249 let entry = capabilities.entry(capability.clone()).or_default();
250 for op in ops {
251 if !entry.contains(op) {
252 entry.push(op.clone());
253 }
254 }
255 entry.sort();
256 }
257 }
258 let side_effect_level = max_side_effect_level(
259 tool_annotations
260 .values()
261 .map(|annotations| annotations.side_effect_level.as_str().to_string())
262 .filter(|level| level != "none"),
263 );
264 CapabilityPolicy {
265 tools,
266 capabilities,
267 workspace_roots: Vec::new(),
268 side_effect_level,
269 recursion_limit: None,
270 tool_arg_constraints: Vec::new(),
271 tool_annotations,
272 }
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct WorkflowEdge {
278 pub from: String,
279 pub to: String,
280 pub branch: Option<String>,
281 pub label: Option<String>,
282}
283
284#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
285#[serde(default)]
286pub struct WorkflowGraph {
287 #[serde(rename = "_type")]
288 pub type_name: String,
289 pub id: String,
290 pub name: Option<String>,
291 pub version: usize,
292 pub entry: String,
293 pub nodes: BTreeMap<String, WorkflowNode>,
294 pub edges: Vec<WorkflowEdge>,
295 pub capability_policy: CapabilityPolicy,
296 pub approval_policy: super::ToolApprovalPolicy,
297 pub metadata: BTreeMap<String, serde_json::Value>,
298 pub audit_log: Vec<WorkflowAuditEntry>,
299}
300
301#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
302#[serde(default)]
303pub struct WorkflowAuditEntry {
304 pub id: String,
305 pub op: String,
306 pub node_id: Option<String>,
307 pub timestamp: String,
308 pub reason: Option<String>,
309 pub metadata: BTreeMap<String, serde_json::Value>,
310}
311
312#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
313#[serde(default)]
314pub struct WorkflowValidationReport {
315 pub valid: bool,
316 pub errors: Vec<String>,
317 pub warnings: Vec<String>,
318 pub reachable_nodes: Vec<String>,
319}
320
321pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
322 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
323 let dict = value.as_dict();
324 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
325 node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
326 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
327 Ok(node)
328}
329
330pub fn parse_workflow_node_json(
331 json: serde_json::Value,
332 label: &str,
333) -> Result<WorkflowNode, VmError> {
334 super::parse_json_payload(json, label)
335}
336
337pub fn parse_workflow_edge_json(
338 json: serde_json::Value,
339 label: &str,
340) -> Result<WorkflowEdge, VmError> {
341 super::parse_json_payload(json, label)
342}
343
344pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
345 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
346 let as_dict = value.as_dict().cloned().unwrap_or_default();
347
348 if graph.nodes.is_empty() {
349 for key in ["act", "verify", "repair"] {
350 if let Some(node_value) = as_dict.get(key) {
351 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
352 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
353 node.id = Some(key.to_string());
354 if node.kind.is_empty() {
355 node.kind = if key == "verify" {
356 "verify".to_string()
357 } else {
358 "stage".to_string()
359 };
360 }
361 if node.model_policy.provider.is_none() {
362 node.model_policy.provider = as_dict
363 .get("provider")
364 .map(|value| value.display())
365 .filter(|value| !value.is_empty());
366 }
367 if node.model_policy.model.is_none() {
368 node.model_policy.model = as_dict
369 .get("model")
370 .map(|value| value.display())
371 .filter(|value| !value.is_empty());
372 }
373 if node.model_policy.model_tier.is_none() {
374 node.model_policy.model_tier = as_dict
375 .get("model_tier")
376 .or_else(|| as_dict.get("tier"))
377 .map(|value| value.display())
378 .filter(|value| !value.is_empty());
379 }
380 if node.model_policy.temperature.is_none() {
381 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
382 if let VmValue::Float(number) = value {
383 Some(*number)
384 } else {
385 value.as_int().map(|number| number as f64)
386 }
387 });
388 }
389 if node.model_policy.max_tokens.is_none() {
390 node.model_policy.max_tokens =
391 as_dict.get("max_tokens").and_then(|value| value.as_int());
392 }
393 if node.mode.is_none() {
394 node.mode = as_dict
395 .get("mode")
396 .map(|value| value.display())
397 .filter(|value| !value.is_empty());
398 }
399 if node.done_sentinel.is_none() {
400 node.done_sentinel = as_dict
401 .get("done_sentinel")
402 .map(|value| value.display())
403 .filter(|value| !value.is_empty());
404 }
405 if key == "verify"
406 && node.verify.is_none()
407 && (raw_node.contains_key("assert_text")
408 || raw_node.contains_key("command")
409 || raw_node.contains_key("expect_status")
410 || raw_node.contains_key("expect_text"))
411 {
412 node.verify = Some(serde_json::json!({
413 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
414 "command": raw_node.get("command").map(vm_value_to_json),
415 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
416 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
417 }));
418 }
419 graph.nodes.insert(key.to_string(), node);
420 }
421 }
422 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
423 graph.entry = "act".to_string();
424 }
425 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
426 if graph.nodes.contains_key("verify") {
427 graph.edges.push(WorkflowEdge {
428 from: "act".to_string(),
429 to: "verify".to_string(),
430 branch: None,
431 label: None,
432 });
433 }
434 if graph.nodes.contains_key("repair") {
435 graph.edges.push(WorkflowEdge {
436 from: "verify".to_string(),
437 to: "repair".to_string(),
438 branch: Some("failed".to_string()),
439 label: None,
440 });
441 graph.edges.push(WorkflowEdge {
442 from: "repair".to_string(),
443 to: "verify".to_string(),
444 branch: Some("retry".to_string()),
445 label: None,
446 });
447 }
448 }
449 }
450
451 if graph.type_name.is_empty() {
452 graph.type_name = "workflow_graph".to_string();
453 }
454 if graph.id.is_empty() {
455 graph.id = new_id("workflow");
456 }
457 if graph.version == 0 {
458 graph.version = 1;
459 }
460 if graph.entry.is_empty() {
461 graph.entry = graph
462 .nodes
463 .keys()
464 .next()
465 .cloned()
466 .unwrap_or_else(|| "act".to_string());
467 }
468 for (node_id, node) in &mut graph.nodes {
469 if node.raw_tools.is_none() {
470 node.raw_tools = as_dict
471 .get("nodes")
472 .and_then(|nodes| nodes.as_dict())
473 .and_then(|nodes| nodes.get(node_id))
474 .and_then(|node_value| node_value.as_dict())
475 .and_then(|raw_node| raw_node.get("tools"))
476 .cloned();
477 }
478 if node.id.is_none() {
479 node.id = Some(node_id.clone());
480 }
481 if node.kind.is_empty() {
482 node.kind = "stage".to_string();
483 }
484 if node.join_policy.strategy.is_empty() {
485 node.join_policy.strategy = "all".to_string();
486 }
487 if node.reduce_policy.strategy.is_empty() {
488 node.reduce_policy.strategy = "concat".to_string();
489 }
490 if node.output_contract.output_kinds.is_empty() {
491 node.output_contract.output_kinds = vec![match node.kind.as_str() {
492 "verify" => "verification_result".to_string(),
493 "reduce" => node
494 .reduce_policy
495 .output_kind
496 .clone()
497 .unwrap_or_else(|| "summary".to_string()),
498 "map" => node
499 .map_policy
500 .output_kind
501 .clone()
502 .unwrap_or_else(|| "artifact".to_string()),
503 "escalation" => "plan".to_string(),
504 _ => "artifact".to_string(),
505 }];
506 }
507 if node.retry_policy.max_attempts == 0 {
508 node.retry_policy.max_attempts = 1;
509 }
510 }
511 Ok(graph)
512}
513
514pub fn validate_workflow(
515 graph: &WorkflowGraph,
516 ceiling: Option<&CapabilityPolicy>,
517) -> WorkflowValidationReport {
518 let mut errors = Vec::new();
519 let mut warnings = Vec::new();
520
521 if !graph.nodes.contains_key(&graph.entry) {
522 errors.push(format!("entry node does not exist: {}", graph.entry));
523 }
524
525 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
526 for edge in &graph.edges {
527 if !node_ids.contains(&edge.from) {
528 errors.push(format!("edge.from references unknown node: {}", edge.from));
529 }
530 if !node_ids.contains(&edge.to) {
531 errors.push(format!("edge.to references unknown node: {}", edge.to));
532 }
533 }
534
535 let reachable_nodes = reachable_nodes(graph);
536 for node_id in &node_ids {
537 if !reachable_nodes.contains(node_id) {
538 warnings.push(format!("node is unreachable: {node_id}"));
539 }
540 }
541
542 for (node_id, node) in &graph.nodes {
543 let incoming = graph
544 .edges
545 .iter()
546 .filter(|edge| edge.to == *node_id)
547 .count();
548 let outgoing: Vec<&WorkflowEdge> = graph
549 .edges
550 .iter()
551 .filter(|edge| edge.from == *node_id)
552 .collect();
553 if let Some(min_inputs) = node.input_contract.min_inputs {
554 if let Some(max_inputs) = node.input_contract.max_inputs {
555 if min_inputs > max_inputs {
556 errors.push(format!(
557 "node {node_id}: input contract min_inputs exceeds max_inputs"
558 ));
559 }
560 }
561 }
562 match node.kind.as_str() {
563 "condition" => {
564 let has_true = outgoing
565 .iter()
566 .any(|edge| edge.branch.as_deref() == Some("true"));
567 let has_false = outgoing
568 .iter()
569 .any(|edge| edge.branch.as_deref() == Some("false"));
570 if !has_true || !has_false {
571 errors.push(format!(
572 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
573 ));
574 }
575 }
576 "fork" => {
577 if outgoing.len() < 2 {
578 errors.push(format!(
579 "node {node_id}: fork nodes require at least two outgoing edges"
580 ));
581 }
582 }
583 "join" => {
584 if incoming < 2 {
585 warnings.push(format!(
586 "node {node_id}: join node has fewer than two incoming edges"
587 ));
588 }
589 }
590 "map" => {
591 if node.map_policy.items.is_empty()
592 && node.map_policy.item_artifact_kind.is_none()
593 && node.input_contract.input_kinds.is_empty()
594 {
595 errors.push(format!(
596 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
597 ));
598 }
599 }
600 "reduce" => {
601 if node.input_contract.input_kinds.is_empty() {
602 warnings.push(format!(
603 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
604 ));
605 }
606 }
607 _ => {}
608 }
609 }
610
611 if let Some(ceiling) = ceiling {
612 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
613 errors.push(error);
614 }
615 for (node_id, node) in &graph.nodes {
616 if let Err(error) = ceiling.intersect(&node.capability_policy) {
617 errors.push(format!("node {node_id}: {error}"));
618 }
619 }
620 }
621
622 WorkflowValidationReport {
623 valid: errors.is_empty(),
624 errors,
625 warnings,
626 reachable_nodes: reachable_nodes.into_iter().collect(),
627 }
628}
629
630fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
631 let mut seen = BTreeSet::new();
632 let mut stack = vec![graph.entry.clone()];
633 while let Some(node_id) = stack.pop() {
634 if !seen.insert(node_id.clone()) {
635 continue;
636 }
637 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
638 stack.push(edge.to.clone());
639 }
640 }
641 seen
642}
643
644pub async fn execute_stage_node(
645 node_id: &str,
646 node: &WorkflowNode,
647 task: &str,
648 artifacts: &[ArtifactRecord],
649 transcript: Option<VmValue>,
650) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
651 let mut selection_policy = node.context_policy.clone();
652 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
653 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
654 }
655 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
656 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
657 let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
658 if node.input_contract.require_transcript && input_transcript.is_none() {
659 return Err(VmError::Runtime(format!(
660 "workflow stage {node_id} requires transcript input"
661 )));
662 }
663 if let Some(min_inputs) = node.input_contract.min_inputs {
664 if selected.len() < min_inputs {
665 return Err(VmError::Runtime(format!(
666 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
667 )));
668 }
669 }
670 if let Some(max_inputs) = node.input_contract.max_inputs {
671 if selected.len() > max_inputs {
672 return Err(VmError::Runtime(format!(
673 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
674 )));
675 }
676 }
677 let prompt = super::render_workflow_prompt(task, node.task_label.as_deref(), &rendered_context);
678
679 let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
680 .ok()
681 .filter(|value| !value.trim().is_empty())
682 .unwrap_or_else(|| {
683 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
684 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
685 crate::llm_config::default_tool_format(&model, &provider)
686 });
687 let mut llm_result = if node.kind == "verify" {
688 if let Some(command) = node
689 .verify
690 .as_ref()
691 .and_then(|verify| verify.as_object())
692 .and_then(|verify| verify.get("command"))
693 .and_then(|value| value.as_str())
694 .map(str::trim)
695 .filter(|value| !value.is_empty())
696 {
697 let mut process = if cfg!(target_os = "windows") {
698 let mut cmd = tokio::process::Command::new("cmd");
699 cmd.arg("/C").arg(command);
700 cmd
701 } else {
702 let mut cmd = tokio::process::Command::new("/bin/sh");
703 cmd.arg("-lc").arg(command);
704 cmd
705 };
706 process.stdin(std::process::Stdio::null());
707 if let Some(context) = crate::stdlib::process::current_execution_context() {
708 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
709 process.current_dir(cwd);
710 }
711 if !context.env.is_empty() {
712 process.envs(context.env);
713 }
714 }
715 let output = process
716 .output()
717 .await
718 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
719 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
720 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
721 let combined = if stderr.is_empty() {
722 stdout.clone()
723 } else if stdout.is_empty() {
724 stderr.clone()
725 } else {
726 format!("{stdout}\n{stderr}")
727 };
728 serde_json::json!({
729 "status": "completed",
730 "text": combined,
731 "visible_text": combined,
732 "command": command,
733 "stdout": stdout,
734 "stderr": stderr,
735 "exit_status": output.status.code().unwrap_or(-1),
736 "success": output.status.success(),
737 })
738 } else {
739 serde_json::json!({
740 "status": "completed",
741 "text": "",
742 "visible_text": "",
743 })
744 }
745 } else {
746 let mut options = BTreeMap::new();
747 if let Some(provider) = &node.model_policy.provider {
748 options.insert(
749 "provider".to_string(),
750 VmValue::String(Rc::from(provider.clone())),
751 );
752 }
753 if let Some(model) = &node.model_policy.model {
754 options.insert(
755 "model".to_string(),
756 VmValue::String(Rc::from(model.clone())),
757 );
758 }
759 if let Some(model_tier) = &node.model_policy.model_tier {
760 options.insert(
761 "model_tier".to_string(),
762 VmValue::String(Rc::from(model_tier.clone())),
763 );
764 }
765 if let Some(temperature) = node.model_policy.temperature {
766 options.insert("temperature".to_string(), VmValue::Float(temperature));
767 }
768 if let Some(max_tokens) = node.model_policy.max_tokens {
769 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
770 }
771 let tool_names = workflow_tool_names(&node.tools);
772 let tools_value = node.raw_tools.clone().or_else(|| {
773 if matches!(node.tools, serde_json::Value::Null) {
774 None
775 } else {
776 Some(crate::stdlib::json_to_vm_value(&node.tools))
777 }
778 });
779 if tools_value.is_some() && !tool_names.is_empty() {
780 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
781 }
782 if let Some(transcript) = input_transcript.clone() {
783 options.insert("transcript".to_string(), transcript);
784 }
785
786 let args = vec![
787 VmValue::String(Rc::from(prompt.clone())),
788 node.system
789 .clone()
790 .map(|s| VmValue::String(Rc::from(s)))
791 .unwrap_or(VmValue::Nil),
792 VmValue::Dict(Rc::new(options)),
793 ];
794 let mut opts = extract_llm_options(&args)?;
795
796 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
797 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
798 let effective_policy = tool_policy
799 .intersect(&node.capability_policy)
800 .map_err(VmError::Runtime)?;
801 let auto_compact = if node.transcript_policy.auto_compact {
802 let mut ac = crate::orchestration::AutoCompactConfig::default();
803 if let Some(v) = node.transcript_policy.compact_threshold {
804 ac.token_threshold = v;
805 }
806 if let Some(v) = node.transcript_policy.tool_output_max_chars {
807 ac.tool_output_max_chars = v;
808 }
809 if let Some(ref strategy) = node.transcript_policy.compact_strategy {
810 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
811 ac.compact_strategy = s;
812 }
813 }
814 if let Some(v) = node.transcript_policy.hard_limit_tokens {
815 ac.hard_limit_tokens = Some(v);
816 }
817 if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
818 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
819 ac.hard_limit_strategy = s;
820 }
821 }
822 if let Some(ref raw_tp) = node.raw_transcript_policy {
825 if let Some(dict) = raw_tp.as_dict() {
826 if let Some(cb) = dict.get("compress_callback") {
827 ac.compress_callback = Some(cb.clone());
828 }
829 if let Some(cb) = dict.get("mask_callback") {
830 ac.mask_callback = Some(cb.clone());
831 }
832 }
833 }
834 {
835 let user_specified_threshold =
836 node.transcript_policy.compact_threshold.is_some();
837 let user_specified_hard_limit =
838 node.transcript_policy.hard_limit_tokens.is_some();
839 crate::llm::api::adapt_auto_compact_to_provider(
840 &mut ac,
841 user_specified_threshold,
842 user_specified_hard_limit,
843 &opts.provider,
844 &opts.model,
845 &opts.api_key,
846 )
847 .await;
848 }
849 Some(ac)
850 } else {
851 None
852 };
853 crate::llm::run_agent_loop_internal(
854 &mut opts,
855 crate::llm::AgentLoopConfig {
856 persistent: true,
857 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
858 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
859 nudge: node.model_policy.nudge.clone(),
860 done_sentinel: node.done_sentinel.clone(),
861 break_unless_phase: None,
862 tool_retries: 0,
863 tool_backoff_ms: 1000,
864 tool_format: tool_format.clone(),
865 auto_compact,
866 policy: Some(effective_policy),
867 approval_policy: Some(node.approval_policy.clone()),
868 daemon: false,
869 daemon_config: Default::default(),
870 llm_retries: 2,
871 llm_backoff_ms: 2000,
872 exit_when_verified: node.exit_when_verified,
873 loop_detect_warn: 2,
874 loop_detect_block: 3,
875 loop_detect_skip: 4,
876 tool_examples: node.model_policy.tool_examples.clone(),
877 turn_policy: node.model_policy.turn_policy.clone(),
878 stop_after_successful_tools: node
879 .model_policy
880 .stop_after_successful_tools
881 .clone(),
882 require_successful_tools: node.model_policy.require_successful_tools.clone(),
883 session_id: node
887 .raw_model_policy
888 .as_ref()
889 .and_then(|v| v.as_dict())
890 .and_then(|d| d.get("session_id"))
891 .and_then(|v| match v {
892 crate::value::VmValue::String(s) if !s.trim().is_empty() => {
893 Some(s.to_string())
894 }
895 _ => None,
896 })
897 .unwrap_or_else(|| format!("workflow_stage_{}", uuid::Uuid::now_v7())),
898 event_sink: None,
899 task_ledger: node
903 .raw_model_policy
904 .as_ref()
905 .and_then(|v| v.as_dict())
906 .and_then(|d| d.get("task_ledger"))
907 .map(crate::llm::helpers::vm_value_to_json)
908 .and_then(|json| serde_json::from_value(json).ok())
909 .unwrap_or_default(),
910 post_turn_callback: node
911 .raw_model_policy
912 .as_ref()
913 .and_then(|v| v.as_dict())
914 .and_then(|d| d.get("post_turn_callback"))
915 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
916 .cloned(),
917 },
918 )
919 .await?
920 } else {
921 let result = vm_call_llm_full(&opts).await?;
922 crate::llm::agent_loop_result_from_llm(&result, opts)
923 }
924 };
925 if let Some(payload) = llm_result.as_object_mut() {
926 payload.insert("prompt".to_string(), serde_json::json!(prompt));
927 payload.insert(
928 "system_prompt".to_string(),
929 serde_json::json!(node.system.clone().unwrap_or_default()),
930 );
931 payload.insert(
932 "rendered_context".to_string(),
933 serde_json::json!(rendered_context),
934 );
935 payload.insert(
936 "selected_artifact_ids".to_string(),
937 serde_json::json!(selected
938 .iter()
939 .map(|artifact| artifact.id.clone())
940 .collect::<Vec<_>>()),
941 );
942 payload.insert(
943 "selected_artifact_titles".to_string(),
944 serde_json::json!(selected
945 .iter()
946 .map(|artifact| artifact.title.clone())
947 .collect::<Vec<_>>()),
948 );
949 payload.insert(
950 "tool_calling_mode".to_string(),
951 serde_json::json!(tool_format.clone()),
952 );
953 }
954
955 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
956 let result_transcript = llm_result
960 .get("transcript")
961 .cloned()
962 .map(|value| crate::stdlib::json_to_vm_value(&value));
963 let transcript = apply_output_transcript_policy(
964 result_transcript.or(input_transcript),
965 &node.transcript_policy,
966 );
967 let output_kind = node
968 .output_contract
969 .output_kinds
970 .first()
971 .cloned()
972 .unwrap_or_else(|| {
973 if node.kind == "verify" {
974 "verification_result".to_string()
975 } else {
976 "artifact".to_string()
977 }
978 });
979 let mut metadata = BTreeMap::new();
980 metadata.insert(
981 "input_artifact_ids".to_string(),
982 serde_json::json!(selected
983 .iter()
984 .map(|artifact| artifact.id.clone())
985 .collect::<Vec<_>>()),
986 );
987 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
988 let artifact = ArtifactRecord {
989 type_name: "artifact".to_string(),
990 id: new_id("artifact"),
991 kind: output_kind,
992 title: Some(format!("stage {node_id} output")),
993 text: Some(visible_text),
994 data: Some(llm_result.clone()),
995 source: Some(node_id.to_string()),
996 created_at: now_rfc3339(),
997 freshness: Some("fresh".to_string()),
998 priority: None,
999 lineage: selected
1000 .iter()
1001 .map(|artifact| artifact.id.clone())
1002 .collect(),
1003 relevance: Some(1.0),
1004 estimated_tokens: None,
1005 stage: Some(node_id.to_string()),
1006 metadata,
1007 }
1008 .normalize();
1009
1010 Ok((llm_result, vec![artifact], transcript))
1011}
1012
1013pub fn next_nodes_for(
1014 graph: &WorkflowGraph,
1015 current: &str,
1016 branch: Option<&str>,
1017) -> Vec<WorkflowEdge> {
1018 let mut matching: Vec<WorkflowEdge> = graph
1019 .edges
1020 .iter()
1021 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1022 .cloned()
1023 .collect();
1024 if matching.is_empty() {
1025 matching = graph
1026 .edges
1027 .iter()
1028 .filter(|edge| edge.from == current && edge.branch.is_none())
1029 .cloned()
1030 .collect();
1031 }
1032 matching
1033}
1034
1035pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1036 next_nodes_for(graph, current, Some(branch))
1037 .into_iter()
1038 .next()
1039 .map(|edge| edge.to)
1040}
1041
1042pub fn append_audit_entry(
1043 graph: &mut WorkflowGraph,
1044 op: &str,
1045 node_id: Option<String>,
1046 reason: Option<String>,
1047 metadata: BTreeMap<String, serde_json::Value>,
1048) {
1049 graph.audit_log.push(WorkflowAuditEntry {
1050 id: new_id("audit"),
1051 op: op.to_string(),
1052 node_id,
1053 timestamp: now_rfc3339(),
1054 reason,
1055 metadata,
1056 });
1057}