1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20 pub id: Option<String>,
21 pub kind: String,
22 pub mode: Option<String>,
23 pub prompt: Option<String>,
24 pub system: Option<String>,
25 pub task_label: Option<String>,
26 pub done_sentinel: Option<String>,
27 pub tools: serde_json::Value,
28 pub model_policy: ModelPolicy,
29 pub auto_compact: AutoCompactPolicy,
34 #[serde(default)]
39 pub output_visibility: Option<String>,
40 pub context_policy: ContextPolicy,
41 pub retry_policy: RetryPolicy,
42 pub capability_policy: CapabilityPolicy,
43 pub approval_policy: super::ToolApprovalPolicy,
44 pub input_contract: StageContract,
45 pub output_contract: StageContract,
46 pub branch_semantics: BranchSemantics,
47 pub map_policy: MapPolicy,
48 pub join_policy: JoinPolicy,
49 pub reduce_policy: ReducePolicy,
50 pub escalation_policy: EscalationPolicy,
51 pub verify: Option<serde_json::Value>,
52 #[serde(default)]
57 pub exit_when_verified: bool,
58 pub metadata: BTreeMap<String, serde_json::Value>,
59 #[serde(skip)]
60 pub raw_tools: Option<VmValue>,
61 #[serde(skip)]
65 pub raw_auto_compact: Option<VmValue>,
66 #[serde(skip)]
69 pub raw_model_policy: Option<VmValue>,
70}
71
72impl PartialEq for WorkflowNode {
73 fn eq(&self, other: &Self) -> bool {
74 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
75 }
76}
77
78pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
79 match value {
80 serde_json::Value::Null => Vec::new(),
81 serde_json::Value::Array(items) => items
82 .iter()
83 .filter_map(|item| match item {
84 serde_json::Value::Object(map) => map
85 .get("name")
86 .and_then(|value| value.as_str())
87 .filter(|name| !name.is_empty())
88 .map(|name| name.to_string()),
89 _ => None,
90 })
91 .collect(),
92 serde_json::Value::Object(map) => {
93 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
94 return map
95 .get("tools")
96 .map(workflow_tool_names)
97 .unwrap_or_default();
98 }
99 map.get("name")
100 .and_then(|value| value.as_str())
101 .filter(|name| !name.is_empty())
102 .map(|name| vec![name.to_string()])
103 .unwrap_or_default()
104 }
105 _ => Vec::new(),
106 }
107}
108
109fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
110 fn rank(v: &str) -> usize {
111 match v {
112 "none" => 0,
113 "read_only" => 1,
114 "workspace_write" => 2,
115 "process_exec" => 3,
116 "network" => 4,
117 _ => 5,
118 }
119 }
120 levels.max_by_key(|level| rank(level))
121}
122
123fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
124 match value.and_then(|v| v.as_str()).unwrap_or("") {
125 "read" => ToolKind::Read,
126 "edit" => ToolKind::Edit,
127 "delete" => ToolKind::Delete,
128 "move" => ToolKind::Move,
129 "search" => ToolKind::Search,
130 "execute" => ToolKind::Execute,
131 "think" => ToolKind::Think,
132 "fetch" => ToolKind::Fetch,
133 _ => ToolKind::Other,
134 }
135}
136
137fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
138 let policy = map
139 .get("policy")
140 .and_then(|value| value.as_object())
141 .cloned()
142 .unwrap_or_default();
143
144 let capabilities = policy
145 .get("capabilities")
146 .and_then(|value| value.as_object())
147 .map(|caps| {
148 caps.iter()
149 .map(|(capability, ops)| {
150 let values = ops
151 .as_array()
152 .map(|items| {
153 items
154 .iter()
155 .filter_map(|item| item.as_str().map(|s| s.to_string()))
156 .collect::<Vec<_>>()
157 })
158 .unwrap_or_default();
159 (capability.clone(), values)
160 })
161 .collect::<BTreeMap<_, _>>()
162 })
163 .unwrap_or_default();
164
165 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
168 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
169 } else {
170 ToolArgSchema {
171 path_params: policy
172 .get("path_params")
173 .and_then(|value| value.as_array())
174 .map(|items| {
175 items
176 .iter()
177 .filter_map(|item| item.as_str().map(|s| s.to_string()))
178 .collect::<Vec<_>>()
179 })
180 .unwrap_or_default(),
181 arg_aliases: policy
182 .get("arg_aliases")
183 .and_then(|value| value.as_object())
184 .map(|aliases| {
185 aliases
186 .iter()
187 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
188 .collect::<BTreeMap<_, _>>()
189 })
190 .unwrap_or_default(),
191 required: policy
192 .get("required")
193 .and_then(|value| value.as_array())
194 .map(|items| {
195 items
196 .iter()
197 .filter_map(|item| item.as_str().map(|s| s.to_string()))
198 .collect::<Vec<_>>()
199 })
200 .unwrap_or_default(),
201 }
202 };
203
204 let kind = parse_tool_kind(policy.get("kind"));
205 let side_effect_level = policy
206 .get("side_effect_level")
207 .and_then(|value| value.as_str())
208 .map(SideEffectLevel::parse)
209 .unwrap_or_default();
210
211 ToolAnnotations {
212 kind,
213 side_effect_level,
214 arg_schema,
215 capabilities,
216 }
217}
218
219pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
220 match value {
221 serde_json::Value::Null => BTreeMap::new(),
222 serde_json::Value::Array(items) => items
223 .iter()
224 .filter_map(|item| match item {
225 serde_json::Value::Object(map) => map
226 .get("name")
227 .and_then(|value| value.as_str())
228 .filter(|name| !name.is_empty())
229 .map(|name| (name.to_string(), parse_tool_annotations(map))),
230 _ => None,
231 })
232 .collect(),
233 serde_json::Value::Object(map) => {
234 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
235 return map
236 .get("tools")
237 .map(workflow_tool_annotations)
238 .unwrap_or_default();
239 }
240 map.get("name")
241 .and_then(|value| value.as_str())
242 .filter(|name| !name.is_empty())
243 .map(|name| {
244 let mut annotations = BTreeMap::new();
245 annotations.insert(name.to_string(), parse_tool_annotations(map));
246 annotations
247 })
248 .unwrap_or_default()
249 }
250 _ => BTreeMap::new(),
251 }
252}
253
254pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
255 let tools = workflow_tool_names(value);
256 let tool_annotations = workflow_tool_annotations(value);
257 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
258 for annotations in tool_annotations.values() {
259 for (capability, ops) in &annotations.capabilities {
260 let entry = capabilities.entry(capability.clone()).or_default();
261 for op in ops {
262 if !entry.contains(op) {
263 entry.push(op.clone());
264 }
265 }
266 entry.sort();
267 }
268 }
269 let side_effect_level = max_side_effect_level(
270 tool_annotations
271 .values()
272 .map(|annotations| annotations.side_effect_level.as_str().to_string())
273 .filter(|level| level != "none"),
274 );
275 CapabilityPolicy {
276 tools,
277 capabilities,
278 workspace_roots: Vec::new(),
279 side_effect_level,
280 recursion_limit: None,
281 tool_arg_constraints: Vec::new(),
282 tool_annotations,
283 }
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
287#[serde(default)]
288pub struct WorkflowEdge {
289 pub from: String,
290 pub to: String,
291 pub branch: Option<String>,
292 pub label: Option<String>,
293}
294
295#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
296#[serde(default)]
297pub struct WorkflowGraph {
298 #[serde(rename = "_type")]
299 pub type_name: String,
300 pub id: String,
301 pub name: Option<String>,
302 pub version: usize,
303 pub entry: String,
304 pub nodes: BTreeMap<String, WorkflowNode>,
305 pub edges: Vec<WorkflowEdge>,
306 pub capability_policy: CapabilityPolicy,
307 pub approval_policy: super::ToolApprovalPolicy,
308 pub metadata: BTreeMap<String, serde_json::Value>,
309 pub audit_log: Vec<WorkflowAuditEntry>,
310}
311
312#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
313#[serde(default)]
314pub struct WorkflowAuditEntry {
315 pub id: String,
316 pub op: String,
317 pub node_id: Option<String>,
318 pub timestamp: String,
319 pub reason: Option<String>,
320 pub metadata: BTreeMap<String, serde_json::Value>,
321}
322
323#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
324#[serde(default)]
325pub struct WorkflowValidationReport {
326 pub valid: bool,
327 pub errors: Vec<String>,
328 pub warnings: Vec<String>,
329 pub reachable_nodes: Vec<String>,
330}
331
332pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
333 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
334 let dict = value.as_dict();
335 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
336 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
337 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
338 Ok(node)
339}
340
341pub fn parse_workflow_node_json(
342 json: serde_json::Value,
343 label: &str,
344) -> Result<WorkflowNode, VmError> {
345 super::parse_json_payload(json, label)
346}
347
348pub fn parse_workflow_edge_json(
349 json: serde_json::Value,
350 label: &str,
351) -> Result<WorkflowEdge, VmError> {
352 super::parse_json_payload(json, label)
353}
354
355pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
356 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
357 let as_dict = value.as_dict().cloned().unwrap_or_default();
358
359 if graph.nodes.is_empty() {
360 for key in ["act", "verify", "repair"] {
361 if let Some(node_value) = as_dict.get(key) {
362 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
363 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
364 node.id = Some(key.to_string());
365 if node.kind.is_empty() {
366 node.kind = if key == "verify" {
367 "verify".to_string()
368 } else {
369 "stage".to_string()
370 };
371 }
372 if node.model_policy.provider.is_none() {
373 node.model_policy.provider = as_dict
374 .get("provider")
375 .map(|value| value.display())
376 .filter(|value| !value.is_empty());
377 }
378 if node.model_policy.model.is_none() {
379 node.model_policy.model = as_dict
380 .get("model")
381 .map(|value| value.display())
382 .filter(|value| !value.is_empty());
383 }
384 if node.model_policy.model_tier.is_none() {
385 node.model_policy.model_tier = as_dict
386 .get("model_tier")
387 .or_else(|| as_dict.get("tier"))
388 .map(|value| value.display())
389 .filter(|value| !value.is_empty());
390 }
391 if node.model_policy.temperature.is_none() {
392 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
393 if let VmValue::Float(number) = value {
394 Some(*number)
395 } else {
396 value.as_int().map(|number| number as f64)
397 }
398 });
399 }
400 if node.model_policy.max_tokens.is_none() {
401 node.model_policy.max_tokens =
402 as_dict.get("max_tokens").and_then(|value| value.as_int());
403 }
404 if node.mode.is_none() {
405 node.mode = as_dict
406 .get("mode")
407 .map(|value| value.display())
408 .filter(|value| !value.is_empty());
409 }
410 if node.done_sentinel.is_none() {
411 node.done_sentinel = as_dict
412 .get("done_sentinel")
413 .map(|value| value.display())
414 .filter(|value| !value.is_empty());
415 }
416 if key == "verify"
417 && node.verify.is_none()
418 && (raw_node.contains_key("assert_text")
419 || raw_node.contains_key("command")
420 || raw_node.contains_key("expect_status")
421 || raw_node.contains_key("expect_text"))
422 {
423 node.verify = Some(serde_json::json!({
424 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
425 "command": raw_node.get("command").map(vm_value_to_json),
426 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
427 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
428 }));
429 }
430 graph.nodes.insert(key.to_string(), node);
431 }
432 }
433 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
434 graph.entry = "act".to_string();
435 }
436 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
437 if graph.nodes.contains_key("verify") {
438 graph.edges.push(WorkflowEdge {
439 from: "act".to_string(),
440 to: "verify".to_string(),
441 branch: None,
442 label: None,
443 });
444 }
445 if graph.nodes.contains_key("repair") {
446 graph.edges.push(WorkflowEdge {
447 from: "verify".to_string(),
448 to: "repair".to_string(),
449 branch: Some("failed".to_string()),
450 label: None,
451 });
452 graph.edges.push(WorkflowEdge {
453 from: "repair".to_string(),
454 to: "verify".to_string(),
455 branch: Some("retry".to_string()),
456 label: None,
457 });
458 }
459 }
460 }
461
462 if graph.type_name.is_empty() {
463 graph.type_name = "workflow_graph".to_string();
464 }
465 if graph.id.is_empty() {
466 graph.id = new_id("workflow");
467 }
468 if graph.version == 0 {
469 graph.version = 1;
470 }
471 if graph.entry.is_empty() {
472 graph.entry = graph
473 .nodes
474 .keys()
475 .next()
476 .cloned()
477 .unwrap_or_else(|| "act".to_string());
478 }
479 for (node_id, node) in &mut graph.nodes {
480 if node.raw_tools.is_none() {
481 node.raw_tools = as_dict
482 .get("nodes")
483 .and_then(|nodes| nodes.as_dict())
484 .and_then(|nodes| nodes.get(node_id))
485 .and_then(|node_value| node_value.as_dict())
486 .and_then(|raw_node| raw_node.get("tools"))
487 .cloned();
488 }
489 if node.id.is_none() {
490 node.id = Some(node_id.clone());
491 }
492 if node.kind.is_empty() {
493 node.kind = "stage".to_string();
494 }
495 if node.join_policy.strategy.is_empty() {
496 node.join_policy.strategy = "all".to_string();
497 }
498 if node.reduce_policy.strategy.is_empty() {
499 node.reduce_policy.strategy = "concat".to_string();
500 }
501 if node.output_contract.output_kinds.is_empty() {
502 node.output_contract.output_kinds = vec![match node.kind.as_str() {
503 "verify" => "verification_result".to_string(),
504 "reduce" => node
505 .reduce_policy
506 .output_kind
507 .clone()
508 .unwrap_or_else(|| "summary".to_string()),
509 "map" => node
510 .map_policy
511 .output_kind
512 .clone()
513 .unwrap_or_else(|| "artifact".to_string()),
514 "escalation" => "plan".to_string(),
515 _ => "artifact".to_string(),
516 }];
517 }
518 if node.retry_policy.max_attempts == 0 {
519 node.retry_policy.max_attempts = 1;
520 }
521 }
522 Ok(graph)
523}
524
525pub fn validate_workflow(
526 graph: &WorkflowGraph,
527 ceiling: Option<&CapabilityPolicy>,
528) -> WorkflowValidationReport {
529 let mut errors = Vec::new();
530 let mut warnings = Vec::new();
531
532 if !graph.nodes.contains_key(&graph.entry) {
533 errors.push(format!("entry node does not exist: {}", graph.entry));
534 }
535
536 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
537 for edge in &graph.edges {
538 if !node_ids.contains(&edge.from) {
539 errors.push(format!("edge.from references unknown node: {}", edge.from));
540 }
541 if !node_ids.contains(&edge.to) {
542 errors.push(format!("edge.to references unknown node: {}", edge.to));
543 }
544 }
545
546 let reachable_nodes = reachable_nodes(graph);
547 for node_id in &node_ids {
548 if !reachable_nodes.contains(node_id) {
549 warnings.push(format!("node is unreachable: {node_id}"));
550 }
551 }
552
553 for (node_id, node) in &graph.nodes {
554 let incoming = graph
555 .edges
556 .iter()
557 .filter(|edge| edge.to == *node_id)
558 .count();
559 let outgoing: Vec<&WorkflowEdge> = graph
560 .edges
561 .iter()
562 .filter(|edge| edge.from == *node_id)
563 .collect();
564 if let Some(min_inputs) = node.input_contract.min_inputs {
565 if let Some(max_inputs) = node.input_contract.max_inputs {
566 if min_inputs > max_inputs {
567 errors.push(format!(
568 "node {node_id}: input contract min_inputs exceeds max_inputs"
569 ));
570 }
571 }
572 }
573 match node.kind.as_str() {
574 "condition" => {
575 let has_true = outgoing
576 .iter()
577 .any(|edge| edge.branch.as_deref() == Some("true"));
578 let has_false = outgoing
579 .iter()
580 .any(|edge| edge.branch.as_deref() == Some("false"));
581 if !has_true || !has_false {
582 errors.push(format!(
583 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
584 ));
585 }
586 }
587 "fork" if outgoing.len() < 2 => {
588 errors.push(format!(
589 "node {node_id}: fork nodes require at least two outgoing edges"
590 ));
591 }
592 "join" if incoming < 2 => {
593 warnings.push(format!(
594 "node {node_id}: join node has fewer than two incoming edges"
595 ));
596 }
597 "map"
598 if node.map_policy.items.is_empty()
599 && node.map_policy.item_artifact_kind.is_none()
600 && node.input_contract.input_kinds.is_empty() =>
601 {
602 errors.push(format!(
603 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
604 ));
605 }
606 "reduce" if node.input_contract.input_kinds.is_empty() => {
607 warnings.push(format!(
608 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
609 ));
610 }
611 _ => {}
612 }
613 }
614
615 if let Some(ceiling) = ceiling {
616 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
617 errors.push(error);
618 }
619 for (node_id, node) in &graph.nodes {
620 if let Err(error) = ceiling.intersect(&node.capability_policy) {
621 errors.push(format!("node {node_id}: {error}"));
622 }
623 }
624 }
625
626 WorkflowValidationReport {
627 valid: errors.is_empty(),
628 errors,
629 warnings,
630 reachable_nodes: reachable_nodes.into_iter().collect(),
631 }
632}
633
634fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
635 let mut seen = BTreeSet::new();
636 let mut stack = vec![graph.entry.clone()];
637 while let Some(node_id) = stack.pop() {
638 if !seen.insert(node_id.clone()) {
639 continue;
640 }
641 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
642 stack.push(edge.to.clone());
643 }
644 }
645 seen
646}
647
648fn resolve_node_session_id(node: &WorkflowNode) -> String {
654 if let Some(explicit) = node
655 .raw_model_policy
656 .as_ref()
657 .and_then(|v| v.as_dict())
658 .and_then(|d| d.get("session_id"))
659 .and_then(|v| match v {
660 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
661 _ => None,
662 })
663 {
664 return explicit;
665 }
666 format!("workflow_stage_{}", uuid::Uuid::now_v7())
667}
668
669pub async fn execute_stage_node(
670 node_id: &str,
671 node: &WorkflowNode,
672 task: &str,
673 artifacts: &[ArtifactRecord],
674) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
675 let mut selection_policy = node.context_policy.clone();
676 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
677 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
678 }
679 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
680 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
681 let stage_session_id = resolve_node_session_id(node);
682 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
683 return Err(VmError::Runtime(format!(
684 "workflow stage {node_id} requires an existing session \
685 (call agent_session_open and feed session_id through model_policy \
686 before entering this stage)"
687 )));
688 }
689 if let Some(min_inputs) = node.input_contract.min_inputs {
690 if selected.len() < min_inputs {
691 return Err(VmError::Runtime(format!(
692 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
693 )));
694 }
695 }
696 if let Some(max_inputs) = node.input_contract.max_inputs {
697 if selected.len() > max_inputs {
698 return Err(VmError::Runtime(format!(
699 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
700 )));
701 }
702 }
703 let prompt = super::render_workflow_prompt(task, node.task_label.as_deref(), &rendered_context);
704
705 let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
706 .ok()
707 .filter(|value| !value.trim().is_empty())
708 .unwrap_or_else(|| {
709 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
710 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
711 crate::llm_config::default_tool_format(&model, &provider)
712 });
713 let mut llm_result = if node.kind == "verify" {
714 if let Some(command) = node
715 .verify
716 .as_ref()
717 .and_then(|verify| verify.as_object())
718 .and_then(|verify| verify.get("command"))
719 .and_then(|value| value.as_str())
720 .map(str::trim)
721 .filter(|value| !value.is_empty())
722 {
723 let mut process = if cfg!(target_os = "windows") {
724 let mut cmd = tokio::process::Command::new("cmd");
725 cmd.arg("/C").arg(command);
726 cmd
727 } else {
728 let mut cmd = tokio::process::Command::new("/bin/sh");
729 cmd.arg("-lc").arg(command);
730 cmd
731 };
732 process.stdin(std::process::Stdio::null());
733 if let Some(context) = crate::stdlib::process::current_execution_context() {
734 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
735 process.current_dir(cwd);
736 }
737 if !context.env.is_empty() {
738 process.envs(context.env);
739 }
740 }
741 let output = process
742 .output()
743 .await
744 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
745 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
746 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
747 let combined = if stderr.is_empty() {
748 stdout.clone()
749 } else if stdout.is_empty() {
750 stderr.clone()
751 } else {
752 format!("{stdout}\n{stderr}")
753 };
754 serde_json::json!({
755 "status": "completed",
756 "text": combined,
757 "visible_text": combined,
758 "command": command,
759 "stdout": stdout,
760 "stderr": stderr,
761 "exit_status": output.status.code().unwrap_or(-1),
762 "success": output.status.success(),
763 })
764 } else {
765 serde_json::json!({
766 "status": "completed",
767 "text": "",
768 "visible_text": "",
769 })
770 }
771 } else {
772 let mut options = BTreeMap::new();
773 if let Some(provider) = &node.model_policy.provider {
774 options.insert(
775 "provider".to_string(),
776 VmValue::String(Rc::from(provider.clone())),
777 );
778 }
779 if let Some(model) = &node.model_policy.model {
780 options.insert(
781 "model".to_string(),
782 VmValue::String(Rc::from(model.clone())),
783 );
784 }
785 if let Some(model_tier) = &node.model_policy.model_tier {
786 options.insert(
787 "model_tier".to_string(),
788 VmValue::String(Rc::from(model_tier.clone())),
789 );
790 }
791 if let Some(temperature) = node.model_policy.temperature {
792 options.insert("temperature".to_string(), VmValue::Float(temperature));
793 }
794 if let Some(max_tokens) = node.model_policy.max_tokens {
795 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
796 }
797 let tool_names = workflow_tool_names(&node.tools);
798 let tools_value = node.raw_tools.clone().or_else(|| {
799 if matches!(node.tools, serde_json::Value::Null) {
800 None
801 } else {
802 Some(crate::stdlib::json_to_vm_value(&node.tools))
803 }
804 });
805 if tools_value.is_some() && !tool_names.is_empty() {
806 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
807 }
808 options.insert(
809 "session_id".to_string(),
810 VmValue::String(Rc::from(stage_session_id.clone())),
811 );
812
813 let args = vec![
814 VmValue::String(Rc::from(prompt.clone())),
815 node.system
816 .clone()
817 .map(|s| VmValue::String(Rc::from(s)))
818 .unwrap_or(VmValue::Nil),
819 VmValue::Dict(Rc::new(options)),
820 ];
821 let mut opts = extract_llm_options(&args)?;
822
823 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
824 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
825 let effective_policy = tool_policy
826 .intersect(&node.capability_policy)
827 .map_err(VmError::Runtime)?;
828 let auto_compact = if node.auto_compact.enabled {
829 let mut ac = crate::orchestration::AutoCompactConfig::default();
830 if let Some(v) = node.auto_compact.token_threshold {
831 ac.token_threshold = v;
832 }
833 if let Some(v) = node.auto_compact.tool_output_max_chars {
834 ac.tool_output_max_chars = v;
835 }
836 if let Some(ref strategy) = node.auto_compact.compact_strategy {
837 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
838 ac.compact_strategy = s;
839 }
840 }
841 if let Some(v) = node.auto_compact.hard_limit_tokens {
842 ac.hard_limit_tokens = Some(v);
843 }
844 if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
845 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
846 ac.hard_limit_strategy = s;
847 }
848 }
849 if let Some(ref raw_ac) = node.raw_auto_compact {
852 if let Some(dict) = raw_ac.as_dict() {
853 if let Some(cb) = dict.get("compress_callback") {
854 ac.compress_callback = Some(cb.clone());
855 }
856 if let Some(cb) = dict.get("mask_callback") {
857 ac.mask_callback = Some(cb.clone());
858 }
859 if let Some(cb) = dict.get("custom_compactor") {
860 ac.custom_compactor = Some(cb.clone());
861 }
862 }
863 }
864 {
865 let user_specified_threshold = node.auto_compact.token_threshold.is_some();
866 let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
867 crate::llm::api::adapt_auto_compact_to_provider(
868 &mut ac,
869 user_specified_threshold,
870 user_specified_hard_limit,
871 &opts.provider,
872 &opts.model,
873 &opts.api_key,
874 )
875 .await;
876 }
877 Some(ac)
878 } else {
879 None
880 };
881 crate::llm::run_agent_loop_internal(
882 &mut opts,
883 crate::llm::AgentLoopConfig {
884 persistent: true,
885 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
886 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
887 nudge: node.model_policy.nudge.clone(),
888 done_sentinel: node.done_sentinel.clone(),
889 break_unless_phase: None,
890 tool_retries: 0,
891 tool_backoff_ms: 1000,
892 tool_format: tool_format.clone(),
893 auto_compact,
894 policy: Some(effective_policy),
895 approval_policy: Some(node.approval_policy.clone()),
896 daemon: false,
897 daemon_config: Default::default(),
898 llm_retries: 2,
899 llm_backoff_ms: 2000,
900 exit_when_verified: node.exit_when_verified,
901 loop_detect_warn: 2,
902 loop_detect_block: 3,
903 loop_detect_skip: 4,
904 tool_examples: node.model_policy.tool_examples.clone(),
905 turn_policy: node.model_policy.turn_policy.clone(),
906 stop_after_successful_tools: node
907 .model_policy
908 .stop_after_successful_tools
909 .clone(),
910 require_successful_tools: node.model_policy.require_successful_tools.clone(),
911 session_id: stage_session_id.clone(),
915 event_sink: None,
916 task_ledger: node
920 .raw_model_policy
921 .as_ref()
922 .and_then(|v| v.as_dict())
923 .and_then(|d| d.get("task_ledger"))
924 .map(crate::llm::helpers::vm_value_to_json)
925 .and_then(|json| serde_json::from_value(json).ok())
926 .unwrap_or_default(),
927 post_turn_callback: node
928 .raw_model_policy
929 .as_ref()
930 .and_then(|v| v.as_dict())
931 .and_then(|d| d.get("post_turn_callback"))
932 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
933 .cloned(),
934 },
935 )
936 .await?
937 } else {
938 let result = vm_call_llm_full(&opts).await?;
939 crate::llm::agent_loop_result_from_llm(&result, opts)
940 }
941 };
942 if let Some(payload) = llm_result.as_object_mut() {
943 payload.insert("prompt".to_string(), serde_json::json!(prompt));
944 payload.insert(
945 "system_prompt".to_string(),
946 serde_json::json!(node.system.clone().unwrap_or_default()),
947 );
948 payload.insert(
949 "rendered_context".to_string(),
950 serde_json::json!(rendered_context),
951 );
952 payload.insert(
953 "selected_artifact_ids".to_string(),
954 serde_json::json!(selected
955 .iter()
956 .map(|artifact| artifact.id.clone())
957 .collect::<Vec<_>>()),
958 );
959 payload.insert(
960 "selected_artifact_titles".to_string(),
961 serde_json::json!(selected
962 .iter()
963 .map(|artifact| artifact.title.clone())
964 .collect::<Vec<_>>()),
965 );
966 payload.insert(
967 "tool_calling_mode".to_string(),
968 serde_json::json!(tool_format.clone()),
969 );
970 }
971
972 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
973 let result_transcript = llm_result
977 .get("transcript")
978 .cloned()
979 .map(|value| crate::stdlib::json_to_vm_value(&value));
980 let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
981 let transcript = result_transcript
982 .or(session_transcript)
983 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
984 let output_kind = node
985 .output_contract
986 .output_kinds
987 .first()
988 .cloned()
989 .unwrap_or_else(|| {
990 if node.kind == "verify" {
991 "verification_result".to_string()
992 } else {
993 "artifact".to_string()
994 }
995 });
996 let mut metadata = BTreeMap::new();
997 metadata.insert(
998 "input_artifact_ids".to_string(),
999 serde_json::json!(selected
1000 .iter()
1001 .map(|artifact| artifact.id.clone())
1002 .collect::<Vec<_>>()),
1003 );
1004 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1005 let artifact = ArtifactRecord {
1006 type_name: "artifact".to_string(),
1007 id: new_id("artifact"),
1008 kind: output_kind,
1009 title: Some(format!("stage {node_id} output")),
1010 text: Some(visible_text),
1011 data: Some(llm_result.clone()),
1012 source: Some(node_id.to_string()),
1013 created_at: now_rfc3339(),
1014 freshness: Some("fresh".to_string()),
1015 priority: None,
1016 lineage: selected
1017 .iter()
1018 .map(|artifact| artifact.id.clone())
1019 .collect(),
1020 relevance: Some(1.0),
1021 estimated_tokens: None,
1022 stage: Some(node_id.to_string()),
1023 metadata,
1024 }
1025 .normalize();
1026
1027 Ok((llm_result, vec![artifact], transcript))
1028}
1029
1030pub fn next_nodes_for(
1031 graph: &WorkflowGraph,
1032 current: &str,
1033 branch: Option<&str>,
1034) -> Vec<WorkflowEdge> {
1035 let mut matching: Vec<WorkflowEdge> = graph
1036 .edges
1037 .iter()
1038 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1039 .cloned()
1040 .collect();
1041 if matching.is_empty() {
1042 matching = graph
1043 .edges
1044 .iter()
1045 .filter(|edge| edge.from == current && edge.branch.is_none())
1046 .cloned()
1047 .collect();
1048 }
1049 matching
1050}
1051
1052pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1053 next_nodes_for(graph, current, Some(branch))
1054 .into_iter()
1055 .next()
1056 .map(|edge| edge.to)
1057}
1058
1059pub fn append_audit_entry(
1060 graph: &mut WorkflowGraph,
1061 op: &str,
1062 node_id: Option<String>,
1063 reason: Option<String>,
1064 metadata: BTreeMap<String, serde_json::Value>,
1065) {
1066 graph.audit_log.push(WorkflowAuditEntry {
1067 id: new_id("audit"),
1068 op: op.to_string(),
1069 node_id,
1070 timestamp: now_rfc3339(),
1071 reason,
1072 metadata,
1073 });
1074}