Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20    pub id: Option<String>,
21    pub kind: String,
22    pub mode: Option<String>,
23    pub prompt: Option<String>,
24    pub system: Option<String>,
25    pub task_label: Option<String>,
26    pub done_sentinel: Option<String>,
27    pub tools: serde_json::Value,
28    pub model_policy: ModelPolicy,
29    /// Per-stage auto-compaction settings for the agent loop's context
30    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
31    /// expressible here — call the `agent_session_*` builtins before the
32    /// stage or in a prior stage.
33    pub auto_compact: AutoCompactPolicy,
34    /// Output visibility filter applied to the transcript after the
35    /// stage's agent loop exits. `"public"` / `"public_only"` drops
36    /// `tool_result` messages and non-public events. `None` or any
37    /// unknown string is a no-op.
38    #[serde(default)]
39    pub output_visibility: Option<String>,
40    pub context_policy: ContextPolicy,
41    pub retry_policy: RetryPolicy,
42    pub capability_policy: CapabilityPolicy,
43    pub approval_policy: super::ToolApprovalPolicy,
44    pub input_contract: StageContract,
45    pub output_contract: StageContract,
46    pub branch_semantics: BranchSemantics,
47    pub map_policy: MapPolicy,
48    pub join_policy: JoinPolicy,
49    pub reduce_policy: ReducePolicy,
50    pub escalation_policy: EscalationPolicy,
51    pub verify: Option<serde_json::Value>,
52    /// When true, the stage's agent loop gates the done sentinel on the most
53    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
54    /// persistent execute stages that fold verification into the loop via a
55    /// shell-exec tool the model invokes explicitly.
56    #[serde(default)]
57    pub exit_when_verified: bool,
58    pub metadata: BTreeMap<String, serde_json::Value>,
59    #[serde(skip)]
60    pub raw_tools: Option<VmValue>,
61    /// Raw auto_compact VmValue dict — preserved for extracting closure
62    /// fields (compress_callback, mask_callback, custom_compactor) that
63    /// can't go through serde.
64    #[serde(skip)]
65    pub raw_auto_compact: Option<VmValue>,
66    /// Raw model_policy VmValue dict — preserved for extracting closure
67    /// fields (post_turn_callback) that can't go through serde.
68    #[serde(skip)]
69    pub raw_model_policy: Option<VmValue>,
70}
71
72impl PartialEq for WorkflowNode {
73    fn eq(&self, other: &Self) -> bool {
74        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
75    }
76}
77
78pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
79    match value {
80        serde_json::Value::Null => Vec::new(),
81        serde_json::Value::Array(items) => items
82            .iter()
83            .filter_map(|item| match item {
84                serde_json::Value::Object(map) => map
85                    .get("name")
86                    .and_then(|value| value.as_str())
87                    .filter(|name| !name.is_empty())
88                    .map(|name| name.to_string()),
89                _ => None,
90            })
91            .collect(),
92        serde_json::Value::Object(map) => {
93            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
94                return map
95                    .get("tools")
96                    .map(workflow_tool_names)
97                    .unwrap_or_default();
98            }
99            map.get("name")
100                .and_then(|value| value.as_str())
101                .filter(|name| !name.is_empty())
102                .map(|name| vec![name.to_string()])
103                .unwrap_or_default()
104        }
105        _ => Vec::new(),
106    }
107}
108
109fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
110    fn rank(v: &str) -> usize {
111        match v {
112            "none" => 0,
113            "read_only" => 1,
114            "workspace_write" => 2,
115            "process_exec" => 3,
116            "network" => 4,
117            _ => 5,
118        }
119    }
120    levels.max_by_key(|level| rank(level))
121}
122
123fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
124    match value.and_then(|v| v.as_str()).unwrap_or("") {
125        "read" => ToolKind::Read,
126        "edit" => ToolKind::Edit,
127        "delete" => ToolKind::Delete,
128        "move" => ToolKind::Move,
129        "search" => ToolKind::Search,
130        "execute" => ToolKind::Execute,
131        "think" => ToolKind::Think,
132        "fetch" => ToolKind::Fetch,
133        _ => ToolKind::Other,
134    }
135}
136
137fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
138    let policy = map
139        .get("policy")
140        .and_then(|value| value.as_object())
141        .cloned()
142        .unwrap_or_default();
143
144    let capabilities = policy
145        .get("capabilities")
146        .and_then(|value| value.as_object())
147        .map(|caps| {
148            caps.iter()
149                .map(|(capability, ops)| {
150                    let values = ops
151                        .as_array()
152                        .map(|items| {
153                            items
154                                .iter()
155                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
156                                .collect::<Vec<_>>()
157                        })
158                        .unwrap_or_default();
159                    (capability.clone(), values)
160                })
161                .collect::<BTreeMap<_, _>>()
162        })
163        .unwrap_or_default();
164
165    // Accept both the structured `policy.arg_schema` object and the legacy
166    // flat fields on `policy` so pipelines can migrate gradually.
167    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
168        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
169    } else {
170        ToolArgSchema {
171            path_params: policy
172                .get("path_params")
173                .and_then(|value| value.as_array())
174                .map(|items| {
175                    items
176                        .iter()
177                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
178                        .collect::<Vec<_>>()
179                })
180                .unwrap_or_default(),
181            arg_aliases: policy
182                .get("arg_aliases")
183                .and_then(|value| value.as_object())
184                .map(|aliases| {
185                    aliases
186                        .iter()
187                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
188                        .collect::<BTreeMap<_, _>>()
189                })
190                .unwrap_or_default(),
191            required: policy
192                .get("required")
193                .and_then(|value| value.as_array())
194                .map(|items| {
195                    items
196                        .iter()
197                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
198                        .collect::<Vec<_>>()
199                })
200                .unwrap_or_default(),
201        }
202    };
203
204    let kind = parse_tool_kind(policy.get("kind"));
205    let side_effect_level = policy
206        .get("side_effect_level")
207        .and_then(|value| value.as_str())
208        .map(SideEffectLevel::parse)
209        .unwrap_or_default();
210
211    ToolAnnotations {
212        kind,
213        side_effect_level,
214        arg_schema,
215        capabilities,
216    }
217}
218
219pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
220    match value {
221        serde_json::Value::Null => BTreeMap::new(),
222        serde_json::Value::Array(items) => items
223            .iter()
224            .filter_map(|item| match item {
225                serde_json::Value::Object(map) => map
226                    .get("name")
227                    .and_then(|value| value.as_str())
228                    .filter(|name| !name.is_empty())
229                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
230                _ => None,
231            })
232            .collect(),
233        serde_json::Value::Object(map) => {
234            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
235                return map
236                    .get("tools")
237                    .map(workflow_tool_annotations)
238                    .unwrap_or_default();
239            }
240            map.get("name")
241                .and_then(|value| value.as_str())
242                .filter(|name| !name.is_empty())
243                .map(|name| {
244                    let mut annotations = BTreeMap::new();
245                    annotations.insert(name.to_string(), parse_tool_annotations(map));
246                    annotations
247                })
248                .unwrap_or_default()
249        }
250        _ => BTreeMap::new(),
251    }
252}
253
254pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
255    let tools = workflow_tool_names(value);
256    let tool_annotations = workflow_tool_annotations(value);
257    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
258    for annotations in tool_annotations.values() {
259        for (capability, ops) in &annotations.capabilities {
260            let entry = capabilities.entry(capability.clone()).or_default();
261            for op in ops {
262                if !entry.contains(op) {
263                    entry.push(op.clone());
264                }
265            }
266            entry.sort();
267        }
268    }
269    let side_effect_level = max_side_effect_level(
270        tool_annotations
271            .values()
272            .map(|annotations| annotations.side_effect_level.as_str().to_string())
273            .filter(|level| level != "none"),
274    );
275    CapabilityPolicy {
276        tools,
277        capabilities,
278        workspace_roots: Vec::new(),
279        side_effect_level,
280        recursion_limit: None,
281        tool_arg_constraints: Vec::new(),
282        tool_annotations,
283    }
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
287#[serde(default)]
288pub struct WorkflowEdge {
289    pub from: String,
290    pub to: String,
291    pub branch: Option<String>,
292    pub label: Option<String>,
293}
294
295#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
296#[serde(default)]
297pub struct WorkflowGraph {
298    #[serde(rename = "_type")]
299    pub type_name: String,
300    pub id: String,
301    pub name: Option<String>,
302    pub version: usize,
303    pub entry: String,
304    pub nodes: BTreeMap<String, WorkflowNode>,
305    pub edges: Vec<WorkflowEdge>,
306    pub capability_policy: CapabilityPolicy,
307    pub approval_policy: super::ToolApprovalPolicy,
308    pub metadata: BTreeMap<String, serde_json::Value>,
309    pub audit_log: Vec<WorkflowAuditEntry>,
310}
311
312#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
313#[serde(default)]
314pub struct WorkflowAuditEntry {
315    pub id: String,
316    pub op: String,
317    pub node_id: Option<String>,
318    pub timestamp: String,
319    pub reason: Option<String>,
320    pub metadata: BTreeMap<String, serde_json::Value>,
321}
322
323#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
324#[serde(default)]
325pub struct WorkflowValidationReport {
326    pub valid: bool,
327    pub errors: Vec<String>,
328    pub warnings: Vec<String>,
329    pub reachable_nodes: Vec<String>,
330}
331
332pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
333    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
334    let dict = value.as_dict();
335    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
336    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
337    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
338    Ok(node)
339}
340
341pub fn parse_workflow_node_json(
342    json: serde_json::Value,
343    label: &str,
344) -> Result<WorkflowNode, VmError> {
345    super::parse_json_payload(json, label)
346}
347
348pub fn parse_workflow_edge_json(
349    json: serde_json::Value,
350    label: &str,
351) -> Result<WorkflowEdge, VmError> {
352    super::parse_json_payload(json, label)
353}
354
355pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
356    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
357    let as_dict = value.as_dict().cloned().unwrap_or_default();
358
359    if graph.nodes.is_empty() {
360        for key in ["act", "verify", "repair"] {
361            if let Some(node_value) = as_dict.get(key) {
362                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
363                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
364                node.id = Some(key.to_string());
365                if node.kind.is_empty() {
366                    node.kind = if key == "verify" {
367                        "verify".to_string()
368                    } else {
369                        "stage".to_string()
370                    };
371                }
372                if node.model_policy.provider.is_none() {
373                    node.model_policy.provider = as_dict
374                        .get("provider")
375                        .map(|value| value.display())
376                        .filter(|value| !value.is_empty());
377                }
378                if node.model_policy.model.is_none() {
379                    node.model_policy.model = as_dict
380                        .get("model")
381                        .map(|value| value.display())
382                        .filter(|value| !value.is_empty());
383                }
384                if node.model_policy.model_tier.is_none() {
385                    node.model_policy.model_tier = as_dict
386                        .get("model_tier")
387                        .or_else(|| as_dict.get("tier"))
388                        .map(|value| value.display())
389                        .filter(|value| !value.is_empty());
390                }
391                if node.model_policy.temperature.is_none() {
392                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
393                        if let VmValue::Float(number) = value {
394                            Some(*number)
395                        } else {
396                            value.as_int().map(|number| number as f64)
397                        }
398                    });
399                }
400                if node.model_policy.max_tokens.is_none() {
401                    node.model_policy.max_tokens =
402                        as_dict.get("max_tokens").and_then(|value| value.as_int());
403                }
404                if node.mode.is_none() {
405                    node.mode = as_dict
406                        .get("mode")
407                        .map(|value| value.display())
408                        .filter(|value| !value.is_empty());
409                }
410                if node.done_sentinel.is_none() {
411                    node.done_sentinel = as_dict
412                        .get("done_sentinel")
413                        .map(|value| value.display())
414                        .filter(|value| !value.is_empty());
415                }
416                if key == "verify"
417                    && node.verify.is_none()
418                    && (raw_node.contains_key("assert_text")
419                        || raw_node.contains_key("command")
420                        || raw_node.contains_key("expect_status")
421                        || raw_node.contains_key("expect_text"))
422                {
423                    node.verify = Some(serde_json::json!({
424                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
425                        "command": raw_node.get("command").map(vm_value_to_json),
426                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
427                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
428                    }));
429                }
430                graph.nodes.insert(key.to_string(), node);
431            }
432        }
433        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
434            graph.entry = "act".to_string();
435        }
436        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
437            if graph.nodes.contains_key("verify") {
438                graph.edges.push(WorkflowEdge {
439                    from: "act".to_string(),
440                    to: "verify".to_string(),
441                    branch: None,
442                    label: None,
443                });
444            }
445            if graph.nodes.contains_key("repair") {
446                graph.edges.push(WorkflowEdge {
447                    from: "verify".to_string(),
448                    to: "repair".to_string(),
449                    branch: Some("failed".to_string()),
450                    label: None,
451                });
452                graph.edges.push(WorkflowEdge {
453                    from: "repair".to_string(),
454                    to: "verify".to_string(),
455                    branch: Some("retry".to_string()),
456                    label: None,
457                });
458            }
459        }
460    }
461
462    if graph.type_name.is_empty() {
463        graph.type_name = "workflow_graph".to_string();
464    }
465    if graph.id.is_empty() {
466        graph.id = new_id("workflow");
467    }
468    if graph.version == 0 {
469        graph.version = 1;
470    }
471    if graph.entry.is_empty() {
472        graph.entry = graph
473            .nodes
474            .keys()
475            .next()
476            .cloned()
477            .unwrap_or_else(|| "act".to_string());
478    }
479    for (node_id, node) in &mut graph.nodes {
480        if node.raw_tools.is_none() {
481            node.raw_tools = as_dict
482                .get("nodes")
483                .and_then(|nodes| nodes.as_dict())
484                .and_then(|nodes| nodes.get(node_id))
485                .and_then(|node_value| node_value.as_dict())
486                .and_then(|raw_node| raw_node.get("tools"))
487                .cloned();
488        }
489        if node.id.is_none() {
490            node.id = Some(node_id.clone());
491        }
492        if node.kind.is_empty() {
493            node.kind = "stage".to_string();
494        }
495        if node.join_policy.strategy.is_empty() {
496            node.join_policy.strategy = "all".to_string();
497        }
498        if node.reduce_policy.strategy.is_empty() {
499            node.reduce_policy.strategy = "concat".to_string();
500        }
501        if node.output_contract.output_kinds.is_empty() {
502            node.output_contract.output_kinds = vec![match node.kind.as_str() {
503                "verify" => "verification_result".to_string(),
504                "reduce" => node
505                    .reduce_policy
506                    .output_kind
507                    .clone()
508                    .unwrap_or_else(|| "summary".to_string()),
509                "map" => node
510                    .map_policy
511                    .output_kind
512                    .clone()
513                    .unwrap_or_else(|| "artifact".to_string()),
514                "escalation" => "plan".to_string(),
515                _ => "artifact".to_string(),
516            }];
517        }
518        if node.retry_policy.max_attempts == 0 {
519            node.retry_policy.max_attempts = 1;
520        }
521    }
522    Ok(graph)
523}
524
525pub fn validate_workflow(
526    graph: &WorkflowGraph,
527    ceiling: Option<&CapabilityPolicy>,
528) -> WorkflowValidationReport {
529    let mut errors = Vec::new();
530    let mut warnings = Vec::new();
531
532    if !graph.nodes.contains_key(&graph.entry) {
533        errors.push(format!("entry node does not exist: {}", graph.entry));
534    }
535
536    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
537    for edge in &graph.edges {
538        if !node_ids.contains(&edge.from) {
539            errors.push(format!("edge.from references unknown node: {}", edge.from));
540        }
541        if !node_ids.contains(&edge.to) {
542            errors.push(format!("edge.to references unknown node: {}", edge.to));
543        }
544    }
545
546    let reachable_nodes = reachable_nodes(graph);
547    for node_id in &node_ids {
548        if !reachable_nodes.contains(node_id) {
549            warnings.push(format!("node is unreachable: {node_id}"));
550        }
551    }
552
553    for (node_id, node) in &graph.nodes {
554        let incoming = graph
555            .edges
556            .iter()
557            .filter(|edge| edge.to == *node_id)
558            .count();
559        let outgoing: Vec<&WorkflowEdge> = graph
560            .edges
561            .iter()
562            .filter(|edge| edge.from == *node_id)
563            .collect();
564        if let Some(min_inputs) = node.input_contract.min_inputs {
565            if let Some(max_inputs) = node.input_contract.max_inputs {
566                if min_inputs > max_inputs {
567                    errors.push(format!(
568                        "node {node_id}: input contract min_inputs exceeds max_inputs"
569                    ));
570                }
571            }
572        }
573        match node.kind.as_str() {
574            "condition" => {
575                let has_true = outgoing
576                    .iter()
577                    .any(|edge| edge.branch.as_deref() == Some("true"));
578                let has_false = outgoing
579                    .iter()
580                    .any(|edge| edge.branch.as_deref() == Some("false"));
581                if !has_true || !has_false {
582                    errors.push(format!(
583                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
584                    ));
585                }
586            }
587            "fork" => {
588                if outgoing.len() < 2 {
589                    errors.push(format!(
590                        "node {node_id}: fork nodes require at least two outgoing edges"
591                    ));
592                }
593            }
594            "join" => {
595                if incoming < 2 {
596                    warnings.push(format!(
597                        "node {node_id}: join node has fewer than two incoming edges"
598                    ));
599                }
600            }
601            "map" => {
602                if node.map_policy.items.is_empty()
603                    && node.map_policy.item_artifact_kind.is_none()
604                    && node.input_contract.input_kinds.is_empty()
605                {
606                    errors.push(format!(
607                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
608                    ));
609                }
610            }
611            "reduce" => {
612                if node.input_contract.input_kinds.is_empty() {
613                    warnings.push(format!(
614                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
615                    ));
616                }
617            }
618            _ => {}
619        }
620    }
621
622    if let Some(ceiling) = ceiling {
623        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
624            errors.push(error);
625        }
626        for (node_id, node) in &graph.nodes {
627            if let Err(error) = ceiling.intersect(&node.capability_policy) {
628                errors.push(format!("node {node_id}: {error}"));
629            }
630        }
631    }
632
633    WorkflowValidationReport {
634        valid: errors.is_empty(),
635        errors,
636        warnings,
637        reachable_nodes: reachable_nodes.into_iter().collect(),
638    }
639}
640
641fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
642    let mut seen = BTreeSet::new();
643    let mut stack = vec![graph.entry.clone()];
644    while let Some(node_id) = stack.pop() {
645        if !seen.insert(node_id.clone()) {
646            continue;
647        }
648        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
649            stack.push(edge.to.clone());
650        }
651    }
652    seen
653}
654
655/// Pick the session id a stage should run under. Prefers an explicit
656/// `session_id` on the node's `model_policy` dict (so pipelines with
657/// `agent_session_open` / `agent_session_fork` flowing through a graph
658/// line up); falls back to a stable, node-derived id so multi-stage
659/// graphs with no explicit session share a conversation across stages.
660fn resolve_node_session_id(node: &WorkflowNode) -> String {
661    if let Some(explicit) = node
662        .raw_model_policy
663        .as_ref()
664        .and_then(|v| v.as_dict())
665        .and_then(|d| d.get("session_id"))
666        .and_then(|v| match v {
667            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
668            _ => None,
669        })
670    {
671        return explicit;
672    }
673    format!("workflow_stage_{}", uuid::Uuid::now_v7())
674}
675
676pub async fn execute_stage_node(
677    node_id: &str,
678    node: &WorkflowNode,
679    task: &str,
680    artifacts: &[ArtifactRecord],
681) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
682    let mut selection_policy = node.context_policy.clone();
683    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
684        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
685    }
686    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
687    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
688    let stage_session_id = resolve_node_session_id(node);
689    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
690        return Err(VmError::Runtime(format!(
691            "workflow stage {node_id} requires an existing session \
692             (call agent_session_open and feed session_id through model_policy \
693             before entering this stage)"
694        )));
695    }
696    if let Some(min_inputs) = node.input_contract.min_inputs {
697        if selected.len() < min_inputs {
698            return Err(VmError::Runtime(format!(
699                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
700            )));
701        }
702    }
703    if let Some(max_inputs) = node.input_contract.max_inputs {
704        if selected.len() > max_inputs {
705            return Err(VmError::Runtime(format!(
706                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
707            )));
708        }
709    }
710    let prompt = super::render_workflow_prompt(task, node.task_label.as_deref(), &rendered_context);
711
712    let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
713        .ok()
714        .filter(|value| !value.trim().is_empty())
715        .unwrap_or_else(|| {
716            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
717            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
718            crate::llm_config::default_tool_format(&model, &provider)
719        });
720    let mut llm_result = if node.kind == "verify" {
721        if let Some(command) = node
722            .verify
723            .as_ref()
724            .and_then(|verify| verify.as_object())
725            .and_then(|verify| verify.get("command"))
726            .and_then(|value| value.as_str())
727            .map(str::trim)
728            .filter(|value| !value.is_empty())
729        {
730            let mut process = if cfg!(target_os = "windows") {
731                let mut cmd = tokio::process::Command::new("cmd");
732                cmd.arg("/C").arg(command);
733                cmd
734            } else {
735                let mut cmd = tokio::process::Command::new("/bin/sh");
736                cmd.arg("-lc").arg(command);
737                cmd
738            };
739            process.stdin(std::process::Stdio::null());
740            if let Some(context) = crate::stdlib::process::current_execution_context() {
741                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
742                    process.current_dir(cwd);
743                }
744                if !context.env.is_empty() {
745                    process.envs(context.env);
746                }
747            }
748            let output = process
749                .output()
750                .await
751                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
752            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
753            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
754            let combined = if stderr.is_empty() {
755                stdout.clone()
756            } else if stdout.is_empty() {
757                stderr.clone()
758            } else {
759                format!("{stdout}\n{stderr}")
760            };
761            serde_json::json!({
762                "status": "completed",
763                "text": combined,
764                "visible_text": combined,
765                "command": command,
766                "stdout": stdout,
767                "stderr": stderr,
768                "exit_status": output.status.code().unwrap_or(-1),
769                "success": output.status.success(),
770            })
771        } else {
772            serde_json::json!({
773                "status": "completed",
774                "text": "",
775                "visible_text": "",
776            })
777        }
778    } else {
779        let mut options = BTreeMap::new();
780        if let Some(provider) = &node.model_policy.provider {
781            options.insert(
782                "provider".to_string(),
783                VmValue::String(Rc::from(provider.clone())),
784            );
785        }
786        if let Some(model) = &node.model_policy.model {
787            options.insert(
788                "model".to_string(),
789                VmValue::String(Rc::from(model.clone())),
790            );
791        }
792        if let Some(model_tier) = &node.model_policy.model_tier {
793            options.insert(
794                "model_tier".to_string(),
795                VmValue::String(Rc::from(model_tier.clone())),
796            );
797        }
798        if let Some(temperature) = node.model_policy.temperature {
799            options.insert("temperature".to_string(), VmValue::Float(temperature));
800        }
801        if let Some(max_tokens) = node.model_policy.max_tokens {
802            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
803        }
804        let tool_names = workflow_tool_names(&node.tools);
805        let tools_value = node.raw_tools.clone().or_else(|| {
806            if matches!(node.tools, serde_json::Value::Null) {
807                None
808            } else {
809                Some(crate::stdlib::json_to_vm_value(&node.tools))
810            }
811        });
812        if tools_value.is_some() && !tool_names.is_empty() {
813            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
814        }
815        options.insert(
816            "session_id".to_string(),
817            VmValue::String(Rc::from(stage_session_id.clone())),
818        );
819
820        let args = vec![
821            VmValue::String(Rc::from(prompt.clone())),
822            node.system
823                .clone()
824                .map(|s| VmValue::String(Rc::from(s)))
825                .unwrap_or(VmValue::Nil),
826            VmValue::Dict(Rc::new(options)),
827        ];
828        let mut opts = extract_llm_options(&args)?;
829
830        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
831            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
832            let effective_policy = tool_policy
833                .intersect(&node.capability_policy)
834                .map_err(VmError::Runtime)?;
835            let auto_compact = if node.auto_compact.enabled {
836                let mut ac = crate::orchestration::AutoCompactConfig::default();
837                if let Some(v) = node.auto_compact.token_threshold {
838                    ac.token_threshold = v;
839                }
840                if let Some(v) = node.auto_compact.tool_output_max_chars {
841                    ac.tool_output_max_chars = v;
842                }
843                if let Some(ref strategy) = node.auto_compact.compact_strategy {
844                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
845                        ac.compact_strategy = s;
846                    }
847                }
848                if let Some(v) = node.auto_compact.hard_limit_tokens {
849                    ac.hard_limit_tokens = Some(v);
850                }
851                if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
852                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
853                        ac.hard_limit_strategy = s;
854                    }
855                }
856                // Closure fields can't round-trip through serde, so extract them
857                // directly from the raw VmValue dict.
858                if let Some(ref raw_ac) = node.raw_auto_compact {
859                    if let Some(dict) = raw_ac.as_dict() {
860                        if let Some(cb) = dict.get("compress_callback") {
861                            ac.compress_callback = Some(cb.clone());
862                        }
863                        if let Some(cb) = dict.get("mask_callback") {
864                            ac.mask_callback = Some(cb.clone());
865                        }
866                        if let Some(cb) = dict.get("custom_compactor") {
867                            ac.custom_compactor = Some(cb.clone());
868                        }
869                    }
870                }
871                {
872                    let user_specified_threshold = node.auto_compact.token_threshold.is_some();
873                    let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
874                    crate::llm::api::adapt_auto_compact_to_provider(
875                        &mut ac,
876                        user_specified_threshold,
877                        user_specified_hard_limit,
878                        &opts.provider,
879                        &opts.model,
880                        &opts.api_key,
881                    )
882                    .await;
883                }
884                Some(ac)
885            } else {
886                None
887            };
888            crate::llm::run_agent_loop_internal(
889                &mut opts,
890                crate::llm::AgentLoopConfig {
891                    persistent: true,
892                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
893                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
894                    nudge: node.model_policy.nudge.clone(),
895                    done_sentinel: node.done_sentinel.clone(),
896                    break_unless_phase: None,
897                    tool_retries: 0,
898                    tool_backoff_ms: 1000,
899                    tool_format: tool_format.clone(),
900                    auto_compact,
901                    policy: Some(effective_policy),
902                    approval_policy: Some(node.approval_policy.clone()),
903                    daemon: false,
904                    daemon_config: Default::default(),
905                    llm_retries: 2,
906                    llm_backoff_ms: 2000,
907                    exit_when_verified: node.exit_when_verified,
908                    loop_detect_warn: 2,
909                    loop_detect_block: 3,
910                    loop_detect_skip: 4,
911                    tool_examples: node.model_policy.tool_examples.clone(),
912                    turn_policy: node.model_policy.turn_policy.clone(),
913                    stop_after_successful_tools: node
914                        .model_policy
915                        .stop_after_successful_tools
916                        .clone(),
917                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
918                    // Use the same session id resolved for the stage so
919                    // agent_subscribe handlers keyed on it, and session
920                    // storage lookups in the agent loop, stay consistent.
921                    session_id: stage_session_id.clone(),
922                    event_sink: None,
923                    // Seed from the stage's explicit deliverables/ledger so the
924                    // graph carries a task-wide plan through map branches and
925                    // nested stages. Empty ledger means no gate.
926                    task_ledger: node
927                        .raw_model_policy
928                        .as_ref()
929                        .and_then(|v| v.as_dict())
930                        .and_then(|d| d.get("task_ledger"))
931                        .map(crate::llm::helpers::vm_value_to_json)
932                        .and_then(|json| serde_json::from_value(json).ok())
933                        .unwrap_or_default(),
934                    post_turn_callback: node
935                        .raw_model_policy
936                        .as_ref()
937                        .and_then(|v| v.as_dict())
938                        .and_then(|d| d.get("post_turn_callback"))
939                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
940                        .cloned(),
941                },
942            )
943            .await?
944        } else {
945            let result = vm_call_llm_full(&opts).await?;
946            crate::llm::agent_loop_result_from_llm(&result, opts)
947        }
948    };
949    if let Some(payload) = llm_result.as_object_mut() {
950        payload.insert("prompt".to_string(), serde_json::json!(prompt));
951        payload.insert(
952            "system_prompt".to_string(),
953            serde_json::json!(node.system.clone().unwrap_or_default()),
954        );
955        payload.insert(
956            "rendered_context".to_string(),
957            serde_json::json!(rendered_context),
958        );
959        payload.insert(
960            "selected_artifact_ids".to_string(),
961            serde_json::json!(selected
962                .iter()
963                .map(|artifact| artifact.id.clone())
964                .collect::<Vec<_>>()),
965        );
966        payload.insert(
967            "selected_artifact_titles".to_string(),
968            serde_json::json!(selected
969                .iter()
970                .map(|artifact| artifact.title.clone())
971                .collect::<Vec<_>>()),
972        );
973        payload.insert(
974            "tool_calling_mode".to_string(),
975            serde_json::json!(tool_format.clone()),
976        );
977    }
978
979    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
980    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
981    // a "transcript" field; fall back to the input so cross-stage conversation
982    // state survives transitions.
983    let result_transcript = llm_result
984        .get("transcript")
985        .cloned()
986        .map(|value| crate::stdlib::json_to_vm_value(&value));
987    let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
988    let transcript = result_transcript
989        .or(session_transcript)
990        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
991    let output_kind = node
992        .output_contract
993        .output_kinds
994        .first()
995        .cloned()
996        .unwrap_or_else(|| {
997            if node.kind == "verify" {
998                "verification_result".to_string()
999            } else {
1000                "artifact".to_string()
1001            }
1002        });
1003    let mut metadata = BTreeMap::new();
1004    metadata.insert(
1005        "input_artifact_ids".to_string(),
1006        serde_json::json!(selected
1007            .iter()
1008            .map(|artifact| artifact.id.clone())
1009            .collect::<Vec<_>>()),
1010    );
1011    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1012    let artifact = ArtifactRecord {
1013        type_name: "artifact".to_string(),
1014        id: new_id("artifact"),
1015        kind: output_kind,
1016        title: Some(format!("stage {node_id} output")),
1017        text: Some(visible_text),
1018        data: Some(llm_result.clone()),
1019        source: Some(node_id.to_string()),
1020        created_at: now_rfc3339(),
1021        freshness: Some("fresh".to_string()),
1022        priority: None,
1023        lineage: selected
1024            .iter()
1025            .map(|artifact| artifact.id.clone())
1026            .collect(),
1027        relevance: Some(1.0),
1028        estimated_tokens: None,
1029        stage: Some(node_id.to_string()),
1030        metadata,
1031    }
1032    .normalize();
1033
1034    Ok((llm_result, vec![artifact], transcript))
1035}
1036
1037pub fn next_nodes_for(
1038    graph: &WorkflowGraph,
1039    current: &str,
1040    branch: Option<&str>,
1041) -> Vec<WorkflowEdge> {
1042    let mut matching: Vec<WorkflowEdge> = graph
1043        .edges
1044        .iter()
1045        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1046        .cloned()
1047        .collect();
1048    if matching.is_empty() {
1049        matching = graph
1050            .edges
1051            .iter()
1052            .filter(|edge| edge.from == current && edge.branch.is_none())
1053            .cloned()
1054            .collect();
1055    }
1056    matching
1057}
1058
1059pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1060    next_nodes_for(graph, current, Some(branch))
1061        .into_iter()
1062        .next()
1063        .map(|edge| edge.to)
1064}
1065
1066pub fn append_audit_entry(
1067    graph: &mut WorkflowGraph,
1068    op: &str,
1069    node_id: Option<String>,
1070    reason: Option<String>,
1071    metadata: BTreeMap<String, serde_json::Value>,
1072) {
1073    graph.audit_log.push(WorkflowAuditEntry {
1074        id: new_id("audit"),
1075        op: op.to_string(),
1076        node_id,
1077        timestamp: now_rfc3339(),
1078        reason,
1079        metadata,
1080    });
1081}