1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10 ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11 MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, TranscriptPolicy,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20 pub id: Option<String>,
21 pub kind: String,
22 pub mode: Option<String>,
23 pub prompt: Option<String>,
24 pub system: Option<String>,
25 pub task_label: Option<String>,
26 pub done_sentinel: Option<String>,
27 pub tools: serde_json::Value,
28 pub model_policy: ModelPolicy,
29 pub transcript_policy: TranscriptPolicy,
30 pub context_policy: ContextPolicy,
31 pub retry_policy: RetryPolicy,
32 pub capability_policy: CapabilityPolicy,
33 pub approval_policy: super::ToolApprovalPolicy,
34 pub input_contract: StageContract,
35 pub output_contract: StageContract,
36 pub branch_semantics: BranchSemantics,
37 pub map_policy: MapPolicy,
38 pub join_policy: JoinPolicy,
39 pub reduce_policy: ReducePolicy,
40 pub escalation_policy: EscalationPolicy,
41 pub verify: Option<serde_json::Value>,
42 #[serde(default)]
47 pub exit_when_verified: bool,
48 pub metadata: BTreeMap<String, serde_json::Value>,
49 #[serde(skip)]
50 pub raw_tools: Option<VmValue>,
51 #[serde(skip)]
54 pub raw_transcript_policy: Option<VmValue>,
55 #[serde(skip)]
58 pub raw_model_policy: Option<VmValue>,
59}
60
61impl PartialEq for WorkflowNode {
62 fn eq(&self, other: &Self) -> bool {
63 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
64 }
65}
66
67pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
68 match value {
69 serde_json::Value::Null => Vec::new(),
70 serde_json::Value::Array(items) => items
71 .iter()
72 .filter_map(|item| match item {
73 serde_json::Value::Object(map) => map
74 .get("name")
75 .and_then(|value| value.as_str())
76 .filter(|name| !name.is_empty())
77 .map(|name| name.to_string()),
78 _ => None,
79 })
80 .collect(),
81 serde_json::Value::Object(map) => {
82 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
83 return map
84 .get("tools")
85 .map(workflow_tool_names)
86 .unwrap_or_default();
87 }
88 map.get("name")
89 .and_then(|value| value.as_str())
90 .filter(|name| !name.is_empty())
91 .map(|name| vec![name.to_string()])
92 .unwrap_or_default()
93 }
94 _ => Vec::new(),
95 }
96}
97
98fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
99 fn rank(v: &str) -> usize {
100 match v {
101 "none" => 0,
102 "read_only" => 1,
103 "workspace_write" => 2,
104 "process_exec" => 3,
105 "network" => 4,
106 _ => 5,
107 }
108 }
109 levels.max_by_key(|level| rank(level))
110}
111
112fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
113 match value.and_then(|v| v.as_str()).unwrap_or("") {
114 "read" => ToolKind::Read,
115 "edit" => ToolKind::Edit,
116 "delete" => ToolKind::Delete,
117 "move" => ToolKind::Move,
118 "search" => ToolKind::Search,
119 "execute" => ToolKind::Execute,
120 "think" => ToolKind::Think,
121 "fetch" => ToolKind::Fetch,
122 _ => ToolKind::Other,
123 }
124}
125
126fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
127 let policy = map
128 .get("policy")
129 .and_then(|value| value.as_object())
130 .cloned()
131 .unwrap_or_default();
132
133 let capabilities = policy
134 .get("capabilities")
135 .and_then(|value| value.as_object())
136 .map(|caps| {
137 caps.iter()
138 .map(|(capability, ops)| {
139 let values = ops
140 .as_array()
141 .map(|items| {
142 items
143 .iter()
144 .filter_map(|item| item.as_str().map(|s| s.to_string()))
145 .collect::<Vec<_>>()
146 })
147 .unwrap_or_default();
148 (capability.clone(), values)
149 })
150 .collect::<BTreeMap<_, _>>()
151 })
152 .unwrap_or_default();
153
154 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
159 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
160 } else {
161 ToolArgSchema {
162 path_params: policy
163 .get("path_params")
164 .and_then(|value| value.as_array())
165 .map(|items| {
166 items
167 .iter()
168 .filter_map(|item| item.as_str().map(|s| s.to_string()))
169 .collect::<Vec<_>>()
170 })
171 .unwrap_or_default(),
172 arg_aliases: policy
173 .get("arg_aliases")
174 .and_then(|value| value.as_object())
175 .map(|aliases| {
176 aliases
177 .iter()
178 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
179 .collect::<BTreeMap<_, _>>()
180 })
181 .unwrap_or_default(),
182 required: policy
183 .get("required")
184 .and_then(|value| value.as_array())
185 .map(|items| {
186 items
187 .iter()
188 .filter_map(|item| item.as_str().map(|s| s.to_string()))
189 .collect::<Vec<_>>()
190 })
191 .unwrap_or_default(),
192 }
193 };
194
195 let kind = parse_tool_kind(policy.get("kind"));
196 let side_effect_level = policy
197 .get("side_effect_level")
198 .and_then(|value| value.as_str())
199 .map(SideEffectLevel::parse)
200 .unwrap_or_default();
201
202 ToolAnnotations {
203 kind,
204 side_effect_level,
205 arg_schema,
206 capabilities,
207 }
208}
209
210pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
211 match value {
212 serde_json::Value::Null => BTreeMap::new(),
213 serde_json::Value::Array(items) => items
214 .iter()
215 .filter_map(|item| match item {
216 serde_json::Value::Object(map) => map
217 .get("name")
218 .and_then(|value| value.as_str())
219 .filter(|name| !name.is_empty())
220 .map(|name| (name.to_string(), parse_tool_annotations(map))),
221 _ => None,
222 })
223 .collect(),
224 serde_json::Value::Object(map) => {
225 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
226 return map
227 .get("tools")
228 .map(workflow_tool_annotations)
229 .unwrap_or_default();
230 }
231 map.get("name")
232 .and_then(|value| value.as_str())
233 .filter(|name| !name.is_empty())
234 .map(|name| {
235 let mut annotations = BTreeMap::new();
236 annotations.insert(name.to_string(), parse_tool_annotations(map));
237 annotations
238 })
239 .unwrap_or_default()
240 }
241 _ => BTreeMap::new(),
242 }
243}
244
245pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
246 let tools = workflow_tool_names(value);
247 let tool_annotations = workflow_tool_annotations(value);
248 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
249 for annotations in tool_annotations.values() {
250 for (capability, ops) in &annotations.capabilities {
251 let entry = capabilities.entry(capability.clone()).or_default();
252 for op in ops {
253 if !entry.contains(op) {
254 entry.push(op.clone());
255 }
256 }
257 entry.sort();
258 }
259 }
260 let side_effect_level = max_side_effect_level(
261 tool_annotations
262 .values()
263 .map(|annotations| annotations.side_effect_level.as_str().to_string())
264 .filter(|level| level != "none"),
265 );
266 CapabilityPolicy {
267 tools,
268 capabilities,
269 workspace_roots: Vec::new(),
270 side_effect_level,
271 recursion_limit: None,
272 tool_arg_constraints: Vec::new(),
273 tool_annotations,
274 }
275}
276
277#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
278#[serde(default)]
279pub struct WorkflowEdge {
280 pub from: String,
281 pub to: String,
282 pub branch: Option<String>,
283 pub label: Option<String>,
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
287#[serde(default)]
288pub struct WorkflowGraph {
289 #[serde(rename = "_type")]
290 pub type_name: String,
291 pub id: String,
292 pub name: Option<String>,
293 pub version: usize,
294 pub entry: String,
295 pub nodes: BTreeMap<String, WorkflowNode>,
296 pub edges: Vec<WorkflowEdge>,
297 pub capability_policy: CapabilityPolicy,
298 pub approval_policy: super::ToolApprovalPolicy,
299 pub metadata: BTreeMap<String, serde_json::Value>,
300 pub audit_log: Vec<WorkflowAuditEntry>,
301}
302
303#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
304#[serde(default)]
305pub struct WorkflowAuditEntry {
306 pub id: String,
307 pub op: String,
308 pub node_id: Option<String>,
309 pub timestamp: String,
310 pub reason: Option<String>,
311 pub metadata: BTreeMap<String, serde_json::Value>,
312}
313
314#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
315#[serde(default)]
316pub struct WorkflowValidationReport {
317 pub valid: bool,
318 pub errors: Vec<String>,
319 pub warnings: Vec<String>,
320 pub reachable_nodes: Vec<String>,
321}
322
323pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
324 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
325 let dict = value.as_dict();
326 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
327 node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
328 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
329 Ok(node)
330}
331
332pub fn parse_workflow_node_json(
333 json: serde_json::Value,
334 label: &str,
335) -> Result<WorkflowNode, VmError> {
336 super::parse_json_payload(json, label)
337}
338
339pub fn parse_workflow_edge_json(
340 json: serde_json::Value,
341 label: &str,
342) -> Result<WorkflowEdge, VmError> {
343 super::parse_json_payload(json, label)
344}
345
346pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
347 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
348 let as_dict = value.as_dict().cloned().unwrap_or_default();
349
350 if graph.nodes.is_empty() {
351 for key in ["act", "verify", "repair"] {
352 if let Some(node_value) = as_dict.get(key) {
353 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
354 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
355 node.id = Some(key.to_string());
356 if node.kind.is_empty() {
357 node.kind = if key == "verify" {
358 "verify".to_string()
359 } else {
360 "stage".to_string()
361 };
362 }
363 if node.model_policy.provider.is_none() {
364 node.model_policy.provider = as_dict
365 .get("provider")
366 .map(|value| value.display())
367 .filter(|value| !value.is_empty());
368 }
369 if node.model_policy.model.is_none() {
370 node.model_policy.model = as_dict
371 .get("model")
372 .map(|value| value.display())
373 .filter(|value| !value.is_empty());
374 }
375 if node.model_policy.model_tier.is_none() {
376 node.model_policy.model_tier = as_dict
377 .get("model_tier")
378 .or_else(|| as_dict.get("tier"))
379 .map(|value| value.display())
380 .filter(|value| !value.is_empty());
381 }
382 if node.model_policy.temperature.is_none() {
383 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
384 if let VmValue::Float(number) = value {
385 Some(*number)
386 } else {
387 value.as_int().map(|number| number as f64)
388 }
389 });
390 }
391 if node.model_policy.max_tokens.is_none() {
392 node.model_policy.max_tokens =
393 as_dict.get("max_tokens").and_then(|value| value.as_int());
394 }
395 if node.mode.is_none() {
396 node.mode = as_dict
397 .get("mode")
398 .map(|value| value.display())
399 .filter(|value| !value.is_empty());
400 }
401 if node.done_sentinel.is_none() {
402 node.done_sentinel = as_dict
403 .get("done_sentinel")
404 .map(|value| value.display())
405 .filter(|value| !value.is_empty());
406 }
407 if key == "verify"
408 && node.verify.is_none()
409 && (raw_node.contains_key("assert_text")
410 || raw_node.contains_key("command")
411 || raw_node.contains_key("expect_status")
412 || raw_node.contains_key("expect_text"))
413 {
414 node.verify = Some(serde_json::json!({
415 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
416 "command": raw_node.get("command").map(vm_value_to_json),
417 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
418 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
419 }));
420 }
421 graph.nodes.insert(key.to_string(), node);
422 }
423 }
424 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
425 graph.entry = "act".to_string();
426 }
427 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
428 if graph.nodes.contains_key("verify") {
429 graph.edges.push(WorkflowEdge {
430 from: "act".to_string(),
431 to: "verify".to_string(),
432 branch: None,
433 label: None,
434 });
435 }
436 if graph.nodes.contains_key("repair") {
437 graph.edges.push(WorkflowEdge {
438 from: "verify".to_string(),
439 to: "repair".to_string(),
440 branch: Some("failed".to_string()),
441 label: None,
442 });
443 graph.edges.push(WorkflowEdge {
444 from: "repair".to_string(),
445 to: "verify".to_string(),
446 branch: Some("retry".to_string()),
447 label: None,
448 });
449 }
450 }
451 }
452
453 if graph.type_name.is_empty() {
454 graph.type_name = "workflow_graph".to_string();
455 }
456 if graph.id.is_empty() {
457 graph.id = new_id("workflow");
458 }
459 if graph.version == 0 {
460 graph.version = 1;
461 }
462 if graph.entry.is_empty() {
463 graph.entry = graph
464 .nodes
465 .keys()
466 .next()
467 .cloned()
468 .unwrap_or_else(|| "act".to_string());
469 }
470 for (node_id, node) in &mut graph.nodes {
471 if node.raw_tools.is_none() {
472 node.raw_tools = as_dict
473 .get("nodes")
474 .and_then(|nodes| nodes.as_dict())
475 .and_then(|nodes| nodes.get(node_id))
476 .and_then(|node_value| node_value.as_dict())
477 .and_then(|raw_node| raw_node.get("tools"))
478 .cloned();
479 }
480 if node.id.is_none() {
481 node.id = Some(node_id.clone());
482 }
483 if node.kind.is_empty() {
484 node.kind = "stage".to_string();
485 }
486 if node.join_policy.strategy.is_empty() {
487 node.join_policy.strategy = "all".to_string();
488 }
489 if node.reduce_policy.strategy.is_empty() {
490 node.reduce_policy.strategy = "concat".to_string();
491 }
492 if node.output_contract.output_kinds.is_empty() {
493 node.output_contract.output_kinds = vec![match node.kind.as_str() {
494 "verify" => "verification_result".to_string(),
495 "reduce" => node
496 .reduce_policy
497 .output_kind
498 .clone()
499 .unwrap_or_else(|| "summary".to_string()),
500 "map" => node
501 .map_policy
502 .output_kind
503 .clone()
504 .unwrap_or_else(|| "artifact".to_string()),
505 "escalation" => "plan".to_string(),
506 _ => "artifact".to_string(),
507 }];
508 }
509 if node.retry_policy.max_attempts == 0 {
510 node.retry_policy.max_attempts = 1;
511 }
512 }
513 Ok(graph)
514}
515
516pub fn validate_workflow(
517 graph: &WorkflowGraph,
518 ceiling: Option<&CapabilityPolicy>,
519) -> WorkflowValidationReport {
520 let mut errors = Vec::new();
521 let mut warnings = Vec::new();
522
523 if !graph.nodes.contains_key(&graph.entry) {
524 errors.push(format!("entry node does not exist: {}", graph.entry));
525 }
526
527 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
528 for edge in &graph.edges {
529 if !node_ids.contains(&edge.from) {
530 errors.push(format!("edge.from references unknown node: {}", edge.from));
531 }
532 if !node_ids.contains(&edge.to) {
533 errors.push(format!("edge.to references unknown node: {}", edge.to));
534 }
535 }
536
537 let reachable_nodes = reachable_nodes(graph);
538 for node_id in &node_ids {
539 if !reachable_nodes.contains(node_id) {
540 warnings.push(format!("node is unreachable: {node_id}"));
541 }
542 }
543
544 for (node_id, node) in &graph.nodes {
545 let incoming = graph
546 .edges
547 .iter()
548 .filter(|edge| edge.to == *node_id)
549 .count();
550 let outgoing: Vec<&WorkflowEdge> = graph
551 .edges
552 .iter()
553 .filter(|edge| edge.from == *node_id)
554 .collect();
555 if let Some(min_inputs) = node.input_contract.min_inputs {
556 if let Some(max_inputs) = node.input_contract.max_inputs {
557 if min_inputs > max_inputs {
558 errors.push(format!(
559 "node {node_id}: input contract min_inputs exceeds max_inputs"
560 ));
561 }
562 }
563 }
564 match node.kind.as_str() {
565 "condition" => {
566 let has_true = outgoing
567 .iter()
568 .any(|edge| edge.branch.as_deref() == Some("true"));
569 let has_false = outgoing
570 .iter()
571 .any(|edge| edge.branch.as_deref() == Some("false"));
572 if !has_true || !has_false {
573 errors.push(format!(
574 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
575 ));
576 }
577 }
578 "fork" => {
579 if outgoing.len() < 2 {
580 errors.push(format!(
581 "node {node_id}: fork nodes require at least two outgoing edges"
582 ));
583 }
584 }
585 "join" => {
586 if incoming < 2 {
587 warnings.push(format!(
588 "node {node_id}: join node has fewer than two incoming edges"
589 ));
590 }
591 }
592 "map" => {
593 if node.map_policy.items.is_empty()
594 && node.map_policy.item_artifact_kind.is_none()
595 && node.input_contract.input_kinds.is_empty()
596 {
597 errors.push(format!(
598 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
599 ));
600 }
601 }
602 "reduce" => {
603 if node.input_contract.input_kinds.is_empty() {
604 warnings.push(format!(
605 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
606 ));
607 }
608 }
609 _ => {}
610 }
611 }
612
613 if let Some(ceiling) = ceiling {
614 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
615 errors.push(error);
616 }
617 for (node_id, node) in &graph.nodes {
618 if let Err(error) = ceiling.intersect(&node.capability_policy) {
619 errors.push(format!("node {node_id}: {error}"));
620 }
621 }
622 }
623
624 WorkflowValidationReport {
625 valid: errors.is_empty(),
626 errors,
627 warnings,
628 reachable_nodes: reachable_nodes.into_iter().collect(),
629 }
630}
631
632fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
633 let mut seen = BTreeSet::new();
634 let mut stack = vec![graph.entry.clone()];
635 while let Some(node_id) = stack.pop() {
636 if !seen.insert(node_id.clone()) {
637 continue;
638 }
639 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
640 stack.push(edge.to.clone());
641 }
642 }
643 seen
644}
645
646pub async fn execute_stage_node(
647 node_id: &str,
648 node: &WorkflowNode,
649 task: &str,
650 artifacts: &[ArtifactRecord],
651 transcript: Option<VmValue>,
652) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
653 let mut selection_policy = node.context_policy.clone();
654 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
655 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
656 }
657 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
658 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
659 let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
660 if node.input_contract.require_transcript && input_transcript.is_none() {
661 return Err(VmError::Runtime(format!(
662 "workflow stage {node_id} requires transcript input"
663 )));
664 }
665 if let Some(min_inputs) = node.input_contract.min_inputs {
666 if selected.len() < min_inputs {
667 return Err(VmError::Runtime(format!(
668 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
669 )));
670 }
671 }
672 if let Some(max_inputs) = node.input_contract.max_inputs {
673 if selected.len() > max_inputs {
674 return Err(VmError::Runtime(format!(
675 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
676 )));
677 }
678 }
679 let prompt = super::render_workflow_prompt(task, node.task_label.as_deref(), &rendered_context);
680
681 let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
682 .ok()
683 .filter(|value| !value.trim().is_empty())
684 .unwrap_or_else(|| {
685 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
687 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
688 crate::llm_config::default_tool_format(&model, &provider)
689 });
690 let mut llm_result = if node.kind == "verify" {
691 if let Some(command) = node
692 .verify
693 .as_ref()
694 .and_then(|verify| verify.as_object())
695 .and_then(|verify| verify.get("command"))
696 .and_then(|value| value.as_str())
697 .map(str::trim)
698 .filter(|value| !value.is_empty())
699 {
700 let mut process = if cfg!(target_os = "windows") {
701 let mut cmd = tokio::process::Command::new("cmd");
702 cmd.arg("/C").arg(command);
703 cmd
704 } else {
705 let mut cmd = tokio::process::Command::new("/bin/sh");
706 cmd.arg("-lc").arg(command);
707 cmd
708 };
709 process.stdin(std::process::Stdio::null());
710 if let Some(context) = crate::stdlib::process::current_execution_context() {
711 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
712 process.current_dir(cwd);
713 }
714 if !context.env.is_empty() {
715 process.envs(context.env);
716 }
717 }
718 let output = process
719 .output()
720 .await
721 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
722 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
723 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
724 let combined = if stderr.is_empty() {
725 stdout.clone()
726 } else if stdout.is_empty() {
727 stderr.clone()
728 } else {
729 format!("{stdout}\n{stderr}")
730 };
731 serde_json::json!({
732 "status": "completed",
733 "text": combined,
734 "visible_text": combined,
735 "command": command,
736 "stdout": stdout,
737 "stderr": stderr,
738 "exit_status": output.status.code().unwrap_or(-1),
739 "success": output.status.success(),
740 })
741 } else {
742 serde_json::json!({
743 "status": "completed",
744 "text": "",
745 "visible_text": "",
746 })
747 }
748 } else {
749 let mut options = BTreeMap::new();
750 if let Some(provider) = &node.model_policy.provider {
751 options.insert(
752 "provider".to_string(),
753 VmValue::String(Rc::from(provider.clone())),
754 );
755 }
756 if let Some(model) = &node.model_policy.model {
757 options.insert(
758 "model".to_string(),
759 VmValue::String(Rc::from(model.clone())),
760 );
761 }
762 if let Some(model_tier) = &node.model_policy.model_tier {
763 options.insert(
764 "model_tier".to_string(),
765 VmValue::String(Rc::from(model_tier.clone())),
766 );
767 }
768 if let Some(temperature) = node.model_policy.temperature {
769 options.insert("temperature".to_string(), VmValue::Float(temperature));
770 }
771 if let Some(max_tokens) = node.model_policy.max_tokens {
772 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
773 }
774 let tool_names = workflow_tool_names(&node.tools);
775 let tools_value = node.raw_tools.clone().or_else(|| {
776 if matches!(node.tools, serde_json::Value::Null) {
777 None
778 } else {
779 Some(crate::stdlib::json_to_vm_value(&node.tools))
780 }
781 });
782 if tools_value.is_some() && !tool_names.is_empty() {
783 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
784 }
785 if let Some(transcript) = input_transcript.clone() {
786 options.insert("transcript".to_string(), transcript);
787 }
788
789 let args = vec![
790 VmValue::String(Rc::from(prompt.clone())),
791 node.system
792 .clone()
793 .map(|s| VmValue::String(Rc::from(s)))
794 .unwrap_or(VmValue::Nil),
795 VmValue::Dict(Rc::new(options)),
796 ];
797 let mut opts = extract_llm_options(&args)?;
798
799 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
800 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
801 let effective_policy = tool_policy
802 .intersect(&node.capability_policy)
803 .map_err(VmError::Runtime)?;
804 let auto_compact = if node.transcript_policy.auto_compact {
806 let mut ac = crate::orchestration::AutoCompactConfig::default();
807 if let Some(v) = node.transcript_policy.compact_threshold {
808 ac.token_threshold = v;
809 }
810 if let Some(v) = node.transcript_policy.tool_output_max_chars {
811 ac.tool_output_max_chars = v;
812 }
813 if let Some(ref strategy) = node.transcript_policy.compact_strategy {
814 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
815 ac.compact_strategy = s;
816 }
817 }
818 if let Some(v) = node.transcript_policy.hard_limit_tokens {
819 ac.hard_limit_tokens = Some(v);
820 }
821 if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
822 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
823 ac.hard_limit_strategy = s;
824 }
825 }
826 if let Some(ref raw_tp) = node.raw_transcript_policy {
828 if let Some(dict) = raw_tp.as_dict() {
829 if let Some(cb) = dict.get("compress_callback") {
830 ac.compress_callback = Some(cb.clone());
831 }
832 if let Some(cb) = dict.get("mask_callback") {
833 ac.mask_callback = Some(cb.clone());
834 }
835 }
836 }
837 {
839 let user_specified_threshold =
840 node.transcript_policy.compact_threshold.is_some();
841 let user_specified_hard_limit =
842 node.transcript_policy.hard_limit_tokens.is_some();
843 crate::llm::api::adapt_auto_compact_to_provider(
844 &mut ac,
845 user_specified_threshold,
846 user_specified_hard_limit,
847 &opts.provider,
848 &opts.model,
849 &opts.api_key,
850 )
851 .await;
852 }
853 Some(ac)
854 } else {
855 None
856 };
857 crate::llm::run_agent_loop_internal(
858 &mut opts,
859 crate::llm::AgentLoopConfig {
860 persistent: true,
861 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
862 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
863 nudge: node.model_policy.nudge.clone(),
864 done_sentinel: node.done_sentinel.clone(),
865 break_unless_phase: None,
866 tool_retries: 0,
867 tool_backoff_ms: 1000,
868 tool_format: tool_format.clone(),
869 auto_compact,
870 policy: Some(effective_policy),
871 approval_policy: Some(node.approval_policy.clone()),
872 daemon: false,
873 daemon_config: Default::default(),
874 llm_retries: 2,
875 llm_backoff_ms: 2000,
876 exit_when_verified: node.exit_when_verified,
877 loop_detect_warn: 2,
878 loop_detect_block: 3,
879 loop_detect_skip: 4,
880 tool_examples: node.model_policy.tool_examples.clone(),
881 turn_policy: node.model_policy.turn_policy.clone(),
882 stop_after_successful_tools: node
883 .model_policy
884 .stop_after_successful_tools
885 .clone(),
886 require_successful_tools: node.model_policy.require_successful_tools.clone(),
887 session_id: node
893 .raw_model_policy
894 .as_ref()
895 .and_then(|v| v.as_dict())
896 .and_then(|d| d.get("session_id"))
897 .and_then(|v| match v {
898 crate::value::VmValue::String(s) if !s.trim().is_empty() => {
899 Some(s.to_string())
900 }
901 _ => None,
902 })
903 .unwrap_or_else(|| format!("workflow_stage_{}", uuid::Uuid::now_v7())),
904 event_sink: None,
905 task_ledger: node
910 .raw_model_policy
911 .as_ref()
912 .and_then(|v| v.as_dict())
913 .and_then(|d| d.get("task_ledger"))
914 .map(crate::llm::helpers::vm_value_to_json)
915 .and_then(|json| serde_json::from_value(json).ok())
916 .unwrap_or_default(),
917 post_turn_callback: node
918 .raw_model_policy
919 .as_ref()
920 .and_then(|v| v.as_dict())
921 .and_then(|d| d.get("post_turn_callback"))
922 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
923 .cloned(),
924 },
925 )
926 .await?
927 } else {
928 let result = vm_call_llm_full(&opts).await?;
929 crate::llm::agent_loop_result_from_llm(&result, opts)
930 }
931 };
932 if let Some(payload) = llm_result.as_object_mut() {
933 payload.insert("prompt".to_string(), serde_json::json!(prompt));
934 payload.insert(
935 "system_prompt".to_string(),
936 serde_json::json!(node.system.clone().unwrap_or_default()),
937 );
938 payload.insert(
939 "rendered_context".to_string(),
940 serde_json::json!(rendered_context),
941 );
942 payload.insert(
943 "selected_artifact_ids".to_string(),
944 serde_json::json!(selected
945 .iter()
946 .map(|artifact| artifact.id.clone())
947 .collect::<Vec<_>>()),
948 );
949 payload.insert(
950 "selected_artifact_titles".to_string(),
951 serde_json::json!(selected
952 .iter()
953 .map(|artifact| artifact.title.clone())
954 .collect::<Vec<_>>()),
955 );
956 payload.insert(
957 "tool_calling_mode".to_string(),
958 serde_json::json!(tool_format.clone()),
959 );
960 }
961
962 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
963 let result_transcript = llm_result
967 .get("transcript")
968 .cloned()
969 .map(|value| crate::stdlib::json_to_vm_value(&value));
970 let transcript = apply_output_transcript_policy(
971 result_transcript.or(input_transcript),
972 &node.transcript_policy,
973 );
974 let output_kind = node
975 .output_contract
976 .output_kinds
977 .first()
978 .cloned()
979 .unwrap_or_else(|| {
980 if node.kind == "verify" {
981 "verification_result".to_string()
982 } else {
983 "artifact".to_string()
984 }
985 });
986 let mut metadata = BTreeMap::new();
987 metadata.insert(
988 "input_artifact_ids".to_string(),
989 serde_json::json!(selected
990 .iter()
991 .map(|artifact| artifact.id.clone())
992 .collect::<Vec<_>>()),
993 );
994 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
995 let artifact = ArtifactRecord {
996 type_name: "artifact".to_string(),
997 id: new_id("artifact"),
998 kind: output_kind,
999 title: Some(format!("stage {node_id} output")),
1000 text: Some(visible_text),
1001 data: Some(llm_result.clone()),
1002 source: Some(node_id.to_string()),
1003 created_at: now_rfc3339(),
1004 freshness: Some("fresh".to_string()),
1005 priority: None,
1006 lineage: selected
1007 .iter()
1008 .map(|artifact| artifact.id.clone())
1009 .collect(),
1010 relevance: Some(1.0),
1011 estimated_tokens: None,
1012 stage: Some(node_id.to_string()),
1013 metadata,
1014 }
1015 .normalize();
1016
1017 Ok((llm_result, vec![artifact], transcript))
1018}
1019
1020pub fn next_nodes_for(
1021 graph: &WorkflowGraph,
1022 current: &str,
1023 branch: Option<&str>,
1024) -> Vec<WorkflowEdge> {
1025 let mut matching: Vec<WorkflowEdge> = graph
1026 .edges
1027 .iter()
1028 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1029 .cloned()
1030 .collect();
1031 if matching.is_empty() {
1032 matching = graph
1033 .edges
1034 .iter()
1035 .filter(|edge| edge.from == current && edge.branch.is_none())
1036 .cloned()
1037 .collect();
1038 }
1039 matching
1040}
1041
1042pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1043 next_nodes_for(graph, current, Some(branch))
1044 .into_iter()
1045 .next()
1046 .map(|edge| edge.to)
1047}
1048
1049pub fn append_audit_entry(
1050 graph: &mut WorkflowGraph,
1051 op: &str,
1052 node_id: Option<String>,
1053 reason: Option<String>,
1054 metadata: BTreeMap<String, serde_json::Value>,
1055) {
1056 graph.audit_log.push(WorkflowAuditEntry {
1057 id: new_id("audit"),
1058 op: op.to_string(),
1059 node_id,
1060 timestamp: now_rfc3339(),
1061 reason,
1062 metadata,
1063 });
1064}