1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23 pub id: Option<String>,
24 pub kind: String,
25 pub mode: Option<String>,
26 pub prompt: Option<String>,
27 pub system: Option<String>,
28 pub task_label: Option<String>,
29 pub done_sentinel: Option<String>,
30 pub tools: serde_json::Value,
31 pub model_policy: ModelPolicy,
32 pub auto_compact: AutoCompactPolicy,
37 #[serde(default)]
42 pub output_visibility: Option<String>,
43 pub context_policy: ContextPolicy,
44 pub retry_policy: RetryPolicy,
45 pub capability_policy: CapabilityPolicy,
46 pub approval_policy: super::ToolApprovalPolicy,
47 pub input_contract: StageContract,
48 pub output_contract: StageContract,
49 pub branch_semantics: BranchSemantics,
50 pub map_policy: MapPolicy,
51 pub join_policy: JoinPolicy,
52 pub reduce_policy: ReducePolicy,
53 pub escalation_policy: EscalationPolicy,
54 pub verify: Option<serde_json::Value>,
55 #[serde(default)]
60 pub exit_when_verified: bool,
61 pub metadata: BTreeMap<String, serde_json::Value>,
62 #[serde(skip)]
63 pub raw_tools: Option<VmValue>,
64 #[serde(skip)]
68 pub raw_auto_compact: Option<VmValue>,
69 #[serde(skip)]
72 pub raw_model_policy: Option<VmValue>,
73}
74
75impl PartialEq for WorkflowNode {
76 fn eq(&self, other: &Self) -> bool {
77 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
78 }
79}
80
81#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
82#[serde(default)]
83pub struct VerificationRequirement {
84 pub kind: String,
85 pub value: String,
86 pub note: Option<String>,
87}
88
89#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
90#[serde(default)]
91pub struct VerificationContract {
92 pub source_node: Option<String>,
93 pub summary: Option<String>,
94 pub command: Option<String>,
95 pub expect_status: Option<i64>,
96 pub assert_text: Option<String>,
97 pub expect_text: Option<String>,
98 pub required_identifiers: Vec<String>,
99 pub required_paths: Vec<String>,
100 pub required_text: Vec<String>,
101 pub notes: Vec<String>,
102 pub checks: Vec<VerificationRequirement>,
103}
104
105impl VerificationContract {
106 fn is_empty(&self) -> bool {
107 self.summary.is_none()
108 && self.command.is_none()
109 && self.expect_status.is_none()
110 && self.assert_text.is_none()
111 && self.expect_text.is_none()
112 && self.required_identifiers.is_empty()
113 && self.required_paths.is_empty()
114 && self.required_text.is_empty()
115 && self.notes.is_empty()
116 && self.checks.is_empty()
117 }
118}
119
120fn push_unique_string(values: &mut Vec<String>, value: &str) {
121 let trimmed = value.trim();
122 if trimmed.is_empty() {
123 return;
124 }
125 if !values.iter().any(|existing| existing == trimmed) {
126 values.push(trimmed.to_string());
127 }
128}
129
130fn push_unique_requirement(
131 values: &mut Vec<VerificationRequirement>,
132 kind: &str,
133 value: &str,
134 note: Option<&str>,
135) {
136 let trimmed_kind = kind.trim();
137 let trimmed_value = value.trim();
138 let trimmed_note = note
139 .map(str::trim)
140 .filter(|candidate| !candidate.is_empty())
141 .map(|candidate| candidate.to_string());
142 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
143 return;
144 }
145 let candidate = VerificationRequirement {
146 kind: trimmed_kind.to_string(),
147 value: trimmed_value.to_string(),
148 note: trimmed_note,
149 };
150 if !values.iter().any(|existing| existing == &candidate) {
151 values.push(candidate);
152 }
153}
154
155fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
156 match value {
157 Some(serde_json::Value::String(text)) => {
158 let mut values = Vec::new();
159 push_unique_string(&mut values, text);
160 values
161 }
162 Some(serde_json::Value::Array(items)) => {
163 let mut values = Vec::new();
164 for item in items {
165 if let Some(text) = item.as_str() {
166 push_unique_string(&mut values, text);
167 }
168 }
169 values
170 }
171 _ => Vec::new(),
172 }
173}
174
175fn merge_verification_requirement_list(
176 target: &mut Vec<VerificationRequirement>,
177 value: Option<&serde_json::Value>,
178) {
179 let Some(items) = value.and_then(|raw| raw.as_array()) else {
180 return;
181 };
182 for item in items {
183 let Some(object) = item.as_object() else {
184 continue;
185 };
186 let kind = object
187 .get("kind")
188 .and_then(|value| value.as_str())
189 .unwrap_or_default();
190 let value = object
191 .get("value")
192 .and_then(|value| value.as_str())
193 .unwrap_or_default();
194 let note = object
195 .get("note")
196 .or_else(|| object.get("description"))
197 .or_else(|| object.get("reason"))
198 .and_then(|value| value.as_str());
199 push_unique_requirement(target, kind, value, note);
200 }
201}
202
203fn merge_verification_contract_fields(
204 target: &mut VerificationContract,
205 object: &serde_json::Map<String, serde_json::Value>,
206) {
207 if target.summary.is_none() {
208 target.summary = object
209 .get("summary")
210 .and_then(|value| value.as_str())
211 .map(str::trim)
212 .filter(|value| !value.is_empty())
213 .map(|value| value.to_string());
214 }
215 if target.command.is_none() {
216 target.command = object
217 .get("command")
218 .and_then(|value| value.as_str())
219 .map(str::trim)
220 .filter(|value| !value.is_empty())
221 .map(|value| value.to_string());
222 }
223 if target.expect_status.is_none() {
224 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
225 }
226 if target.assert_text.is_none() {
227 target.assert_text = object
228 .get("assert_text")
229 .and_then(|value| value.as_str())
230 .map(str::trim)
231 .filter(|value| !value.is_empty())
232 .map(|value| value.to_string());
233 }
234 if target.expect_text.is_none() {
235 target.expect_text = object
236 .get("expect_text")
237 .and_then(|value| value.as_str())
238 .map(str::trim)
239 .filter(|value| !value.is_empty())
240 .map(|value| value.to_string());
241 }
242
243 for value in json_string_list(
244 object
245 .get("required_identifiers")
246 .or_else(|| object.get("identifiers")),
247 ) {
248 push_unique_string(&mut target.required_identifiers, &value);
249 }
250 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
251 push_unique_string(&mut target.required_paths, &value);
252 }
253 for value in json_string_list(
254 object
255 .get("required_text")
256 .or_else(|| object.get("exact_text"))
257 .or_else(|| object.get("required_strings")),
258 ) {
259 push_unique_string(&mut target.required_text, &value);
260 }
261 for value in json_string_list(object.get("notes")) {
262 push_unique_string(&mut target.notes, &value);
263 }
264 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
265}
266
267fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
268 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
269 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
270 VmError::Runtime(format!(
271 "workflow verification contract read failed for {}: {error}",
272 resolved.display()
273 ))
274 })?;
275 serde_json::from_str(&contents).map_err(|error| {
276 VmError::Runtime(format!(
277 "workflow verification contract parse failed for {}: {error}",
278 resolved.display()
279 ))
280 })
281}
282
283fn resolve_verification_contract_path(
284 verify: &serde_json::Map<String, serde_json::Value>,
285) -> Result<Option<serde_json::Value>, VmError> {
286 let Some(path) = verify
287 .get("contract_path")
288 .or_else(|| verify.get("verification_contract_path"))
289 .and_then(|value| value.as_str())
290 .map(str::trim)
291 .filter(|value| !value.is_empty())
292 else {
293 return Ok(None);
294 };
295 Ok(Some(load_verification_contract_file(path)?))
296}
297
298pub fn verification_contract_from_verify(
299 node_id: &str,
300 verify: Option<&serde_json::Value>,
301) -> Result<Option<VerificationContract>, VmError> {
302 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
303 return Ok(None);
304 };
305
306 let mut contract = VerificationContract {
307 source_node: Some(node_id.to_string()),
308 ..Default::default()
309 };
310
311 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
312 let Some(object) = file_contract.as_object() else {
313 return Err(VmError::Runtime(
314 "workflow verification contract file must parse to a JSON object".to_string(),
315 ));
316 };
317 merge_verification_contract_fields(&mut contract, object);
318 }
319
320 if let Some(inline_contract) = verify_object.get("contract") {
321 let Some(object) = inline_contract.as_object() else {
322 return Err(VmError::Runtime(
323 "workflow verify.contract must be an object".to_string(),
324 ));
325 };
326 merge_verification_contract_fields(&mut contract, object);
327 }
328
329 merge_verification_contract_fields(&mut contract, verify_object);
330
331 if let Some(assert_text) = contract.assert_text.clone() {
332 push_unique_requirement(
333 &mut contract.checks,
334 "visible_text_contains",
335 &assert_text,
336 Some("verify stage requires visible output to contain this text"),
337 );
338 }
339 if let Some(expect_text) = contract.expect_text.clone() {
340 push_unique_requirement(
341 &mut contract.checks,
342 "combined_output_contains",
343 &expect_text,
344 Some("verify command requires combined stdout/stderr to contain this text"),
345 );
346 }
347 if let Some(expect_status) = contract.expect_status {
348 push_unique_requirement(
349 &mut contract.checks,
350 "expect_status",
351 &expect_status.to_string(),
352 Some("verify command exit status must match exactly"),
353 );
354 }
355 for identifier in contract.required_identifiers.clone() {
356 push_unique_requirement(
357 &mut contract.checks,
358 "identifier",
359 &identifier,
360 Some("use this exact identifier spelling"),
361 );
362 }
363 for path in contract.required_paths.clone() {
364 push_unique_requirement(
365 &mut contract.checks,
366 "path",
367 &path,
368 Some("preserve this exact path"),
369 );
370 }
371 for text in contract.required_text.clone() {
372 push_unique_requirement(
373 &mut contract.checks,
374 "text",
375 &text,
376 Some("required exact text or wiring snippet"),
377 );
378 }
379
380 if contract.is_empty() {
381 return Ok(None);
382 }
383 Ok(Some(contract))
384}
385
386fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
387 if !values.iter().any(|existing| existing == &candidate) {
388 values.push(candidate);
389 }
390}
391
392pub fn workflow_verification_contracts(
393 graph: &WorkflowGraph,
394) -> Result<Vec<VerificationContract>, VmError> {
395 let mut contracts = Vec::new();
396 for (node_id, node) in &graph.nodes {
397 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
398 push_unique_contract(&mut contracts, contract);
399 }
400 }
401 Ok(contracts)
402}
403
404pub fn inject_workflow_verification_contracts(
405 node: &mut WorkflowNode,
406 contracts: &[VerificationContract],
407) {
408 if contracts.is_empty() {
409 return;
410 }
411 node.metadata.insert(
412 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
413 serde_json::to_value(contracts).unwrap_or_default(),
414 );
415}
416
417pub fn stage_verification_contracts(
418 node_id: &str,
419 node: &WorkflowNode,
420) -> Result<Vec<VerificationContract>, VmError> {
421 let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
422 let local_only = matches!(
423 node.metadata
424 .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
425 .and_then(|value| value.as_str()),
426 Some("local_only")
427 );
428 if local_only {
429 return Ok(local_contract.into_iter().collect());
430 }
431
432 let mut contracts = node
433 .metadata
434 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
435 .cloned()
436 .map(|value| {
437 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
438 VmError::Runtime(format!(
439 "workflow stage {node_id} verification contract metadata parse failed: {error}"
440 ))
441 })
442 })
443 .transpose()?
444 .unwrap_or_default();
445
446 if let Some(local_contract) = local_contract {
447 push_unique_contract(&mut contracts, local_contract);
448 }
449 Ok(contracts)
450}
451
452pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
453 match value {
454 serde_json::Value::Null => Vec::new(),
455 serde_json::Value::Array(items) => items
456 .iter()
457 .filter_map(|item| match item {
458 serde_json::Value::Object(map) => map
459 .get("name")
460 .and_then(|value| value.as_str())
461 .filter(|name| !name.is_empty())
462 .map(|name| name.to_string()),
463 _ => None,
464 })
465 .collect(),
466 serde_json::Value::Object(map) => {
467 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
468 return map
469 .get("tools")
470 .map(workflow_tool_names)
471 .unwrap_or_default();
472 }
473 map.get("name")
474 .and_then(|value| value.as_str())
475 .filter(|name| !name.is_empty())
476 .map(|name| vec![name.to_string()])
477 .unwrap_or_default()
478 }
479 _ => Vec::new(),
480 }
481}
482
483fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
484 fn rank(v: &str) -> usize {
485 match v {
486 "none" => 0,
487 "read_only" => 1,
488 "workspace_write" => 2,
489 "process_exec" => 3,
490 "network" => 4,
491 _ => 5,
492 }
493 }
494 levels.max_by_key(|level| rank(level))
495}
496
497fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
498 match value.and_then(|v| v.as_str()).unwrap_or("") {
499 "read" => ToolKind::Read,
500 "edit" => ToolKind::Edit,
501 "delete" => ToolKind::Delete,
502 "move" => ToolKind::Move,
503 "search" => ToolKind::Search,
504 "execute" => ToolKind::Execute,
505 "think" => ToolKind::Think,
506 "fetch" => ToolKind::Fetch,
507 _ => ToolKind::Other,
508 }
509}
510
511fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
512 let policy = map
513 .get("policy")
514 .and_then(|value| value.as_object())
515 .cloned()
516 .unwrap_or_default();
517
518 let capabilities = policy
519 .get("capabilities")
520 .and_then(|value| value.as_object())
521 .map(|caps| {
522 caps.iter()
523 .map(|(capability, ops)| {
524 let values = ops
525 .as_array()
526 .map(|items| {
527 items
528 .iter()
529 .filter_map(|item| item.as_str().map(|s| s.to_string()))
530 .collect::<Vec<_>>()
531 })
532 .unwrap_or_default();
533 (capability.clone(), values)
534 })
535 .collect::<BTreeMap<_, _>>()
536 })
537 .unwrap_or_default();
538
539 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
542 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
543 } else {
544 ToolArgSchema {
545 path_params: policy
546 .get("path_params")
547 .and_then(|value| value.as_array())
548 .map(|items| {
549 items
550 .iter()
551 .filter_map(|item| item.as_str().map(|s| s.to_string()))
552 .collect::<Vec<_>>()
553 })
554 .unwrap_or_default(),
555 arg_aliases: policy
556 .get("arg_aliases")
557 .and_then(|value| value.as_object())
558 .map(|aliases| {
559 aliases
560 .iter()
561 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
562 .collect::<BTreeMap<_, _>>()
563 })
564 .unwrap_or_default(),
565 required: policy
566 .get("required")
567 .and_then(|value| value.as_array())
568 .map(|items| {
569 items
570 .iter()
571 .filter_map(|item| item.as_str().map(|s| s.to_string()))
572 .collect::<Vec<_>>()
573 })
574 .unwrap_or_default(),
575 }
576 };
577
578 let kind = parse_tool_kind(policy.get("kind"));
579 let side_effect_level = policy
580 .get("side_effect_level")
581 .and_then(|value| value.as_str())
582 .map(SideEffectLevel::parse)
583 .unwrap_or_default();
584
585 ToolAnnotations {
586 kind,
587 side_effect_level,
588 arg_schema,
589 capabilities,
590 }
591}
592
593pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
594 match value {
595 serde_json::Value::Null => BTreeMap::new(),
596 serde_json::Value::Array(items) => items
597 .iter()
598 .filter_map(|item| match item {
599 serde_json::Value::Object(map) => map
600 .get("name")
601 .and_then(|value| value.as_str())
602 .filter(|name| !name.is_empty())
603 .map(|name| (name.to_string(), parse_tool_annotations(map))),
604 _ => None,
605 })
606 .collect(),
607 serde_json::Value::Object(map) => {
608 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
609 return map
610 .get("tools")
611 .map(workflow_tool_annotations)
612 .unwrap_or_default();
613 }
614 map.get("name")
615 .and_then(|value| value.as_str())
616 .filter(|name| !name.is_empty())
617 .map(|name| {
618 let mut annotations = BTreeMap::new();
619 annotations.insert(name.to_string(), parse_tool_annotations(map));
620 annotations
621 })
622 .unwrap_or_default()
623 }
624 _ => BTreeMap::new(),
625 }
626}
627
628pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
629 let tools = workflow_tool_names(value);
630 let tool_annotations = workflow_tool_annotations(value);
631 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
632 for annotations in tool_annotations.values() {
633 for (capability, ops) in &annotations.capabilities {
634 let entry = capabilities.entry(capability.clone()).or_default();
635 for op in ops {
636 if !entry.contains(op) {
637 entry.push(op.clone());
638 }
639 }
640 entry.sort();
641 }
642 }
643 let side_effect_level = max_side_effect_level(
644 tool_annotations
645 .values()
646 .map(|annotations| annotations.side_effect_level.as_str().to_string())
647 .filter(|level| level != "none"),
648 );
649 CapabilityPolicy {
650 tools,
651 capabilities,
652 workspace_roots: Vec::new(),
653 side_effect_level,
654 recursion_limit: None,
655 tool_arg_constraints: Vec::new(),
656 tool_annotations,
657 }
658}
659
660#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
661#[serde(default)]
662pub struct WorkflowEdge {
663 pub from: String,
664 pub to: String,
665 pub branch: Option<String>,
666 pub label: Option<String>,
667}
668
669#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
670#[serde(default)]
671pub struct WorkflowGraph {
672 #[serde(rename = "_type")]
673 pub type_name: String,
674 pub id: String,
675 pub name: Option<String>,
676 pub version: usize,
677 pub entry: String,
678 pub nodes: BTreeMap<String, WorkflowNode>,
679 pub edges: Vec<WorkflowEdge>,
680 pub capability_policy: CapabilityPolicy,
681 pub approval_policy: super::ToolApprovalPolicy,
682 pub metadata: BTreeMap<String, serde_json::Value>,
683 pub audit_log: Vec<WorkflowAuditEntry>,
684}
685
686#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
687#[serde(default)]
688pub struct WorkflowAuditEntry {
689 pub id: String,
690 pub op: String,
691 pub node_id: Option<String>,
692 pub timestamp: String,
693 pub reason: Option<String>,
694 pub metadata: BTreeMap<String, serde_json::Value>,
695}
696
697#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
698#[serde(default)]
699pub struct WorkflowValidationReport {
700 pub valid: bool,
701 pub errors: Vec<String>,
702 pub warnings: Vec<String>,
703 pub reachable_nodes: Vec<String>,
704}
705
706pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
707 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
708 let dict = value.as_dict();
709 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
710 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
711 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
712 Ok(node)
713}
714
715pub fn parse_workflow_node_json(
716 json: serde_json::Value,
717 label: &str,
718) -> Result<WorkflowNode, VmError> {
719 super::parse_json_payload(json, label)
720}
721
722pub fn parse_workflow_edge_json(
723 json: serde_json::Value,
724 label: &str,
725) -> Result<WorkflowEdge, VmError> {
726 super::parse_json_payload(json, label)
727}
728
729pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
730 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
731 let as_dict = value.as_dict().cloned().unwrap_or_default();
732
733 if graph.nodes.is_empty() {
734 for key in ["act", "verify", "repair"] {
735 if let Some(node_value) = as_dict.get(key) {
736 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
737 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
738 node.id = Some(key.to_string());
739 if node.kind.is_empty() {
740 node.kind = if key == "verify" {
741 "verify".to_string()
742 } else {
743 "stage".to_string()
744 };
745 }
746 if node.model_policy.provider.is_none() {
747 node.model_policy.provider = as_dict
748 .get("provider")
749 .map(|value| value.display())
750 .filter(|value| !value.is_empty());
751 }
752 if node.model_policy.model.is_none() {
753 node.model_policy.model = as_dict
754 .get("model")
755 .map(|value| value.display())
756 .filter(|value| !value.is_empty());
757 }
758 if node.model_policy.model_tier.is_none() {
759 node.model_policy.model_tier = as_dict
760 .get("model_tier")
761 .or_else(|| as_dict.get("tier"))
762 .map(|value| value.display())
763 .filter(|value| !value.is_empty());
764 }
765 if node.model_policy.temperature.is_none() {
766 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
767 if let VmValue::Float(number) = value {
768 Some(*number)
769 } else {
770 value.as_int().map(|number| number as f64)
771 }
772 });
773 }
774 if node.model_policy.max_tokens.is_none() {
775 node.model_policy.max_tokens =
776 as_dict.get("max_tokens").and_then(|value| value.as_int());
777 }
778 if node.mode.is_none() {
779 node.mode = as_dict
780 .get("mode")
781 .map(|value| value.display())
782 .filter(|value| !value.is_empty());
783 }
784 if node.done_sentinel.is_none() {
785 node.done_sentinel = as_dict
786 .get("done_sentinel")
787 .map(|value| value.display())
788 .filter(|value| !value.is_empty());
789 }
790 if key == "verify"
791 && node.verify.is_none()
792 && (raw_node.contains_key("assert_text")
793 || raw_node.contains_key("command")
794 || raw_node.contains_key("expect_status")
795 || raw_node.contains_key("expect_text"))
796 {
797 node.verify = Some(serde_json::json!({
798 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
799 "command": raw_node.get("command").map(vm_value_to_json),
800 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
801 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
802 }));
803 }
804 graph.nodes.insert(key.to_string(), node);
805 }
806 }
807 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
808 graph.entry = "act".to_string();
809 }
810 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
811 if graph.nodes.contains_key("verify") {
812 graph.edges.push(WorkflowEdge {
813 from: "act".to_string(),
814 to: "verify".to_string(),
815 branch: None,
816 label: None,
817 });
818 }
819 if graph.nodes.contains_key("repair") {
820 graph.edges.push(WorkflowEdge {
821 from: "verify".to_string(),
822 to: "repair".to_string(),
823 branch: Some("failed".to_string()),
824 label: None,
825 });
826 graph.edges.push(WorkflowEdge {
827 from: "repair".to_string(),
828 to: "verify".to_string(),
829 branch: Some("retry".to_string()),
830 label: None,
831 });
832 }
833 }
834 }
835
836 if graph.type_name.is_empty() {
837 graph.type_name = "workflow_graph".to_string();
838 }
839 if graph.id.is_empty() {
840 graph.id = new_id("workflow");
841 }
842 if graph.version == 0 {
843 graph.version = 1;
844 }
845 if graph.entry.is_empty() {
846 graph.entry = graph
847 .nodes
848 .keys()
849 .next()
850 .cloned()
851 .unwrap_or_else(|| "act".to_string());
852 }
853 for (node_id, node) in &mut graph.nodes {
854 if node.raw_tools.is_none() {
855 node.raw_tools = as_dict
856 .get("nodes")
857 .and_then(|nodes| nodes.as_dict())
858 .and_then(|nodes| nodes.get(node_id))
859 .and_then(|node_value| node_value.as_dict())
860 .and_then(|raw_node| raw_node.get("tools"))
861 .cloned();
862 }
863 if node.id.is_none() {
864 node.id = Some(node_id.clone());
865 }
866 if node.kind.is_empty() {
867 node.kind = "stage".to_string();
868 }
869 if node.join_policy.strategy.is_empty() {
870 node.join_policy.strategy = "all".to_string();
871 }
872 if node.reduce_policy.strategy.is_empty() {
873 node.reduce_policy.strategy = "concat".to_string();
874 }
875 if node.output_contract.output_kinds.is_empty() {
876 node.output_contract.output_kinds = vec![match node.kind.as_str() {
877 "verify" => "verification_result".to_string(),
878 "reduce" => node
879 .reduce_policy
880 .output_kind
881 .clone()
882 .unwrap_or_else(|| "summary".to_string()),
883 "map" => node
884 .map_policy
885 .output_kind
886 .clone()
887 .unwrap_or_else(|| "artifact".to_string()),
888 "escalation" => "plan".to_string(),
889 _ => "artifact".to_string(),
890 }];
891 }
892 if node.retry_policy.max_attempts == 0 {
893 node.retry_policy.max_attempts = 1;
894 }
895 }
896 Ok(graph)
897}
898
899pub fn validate_workflow(
900 graph: &WorkflowGraph,
901 ceiling: Option<&CapabilityPolicy>,
902) -> WorkflowValidationReport {
903 let mut errors = Vec::new();
904 let mut warnings = Vec::new();
905
906 if !graph.nodes.contains_key(&graph.entry) {
907 errors.push(format!("entry node does not exist: {}", graph.entry));
908 }
909
910 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
911 for edge in &graph.edges {
912 if !node_ids.contains(&edge.from) {
913 errors.push(format!("edge.from references unknown node: {}", edge.from));
914 }
915 if !node_ids.contains(&edge.to) {
916 errors.push(format!("edge.to references unknown node: {}", edge.to));
917 }
918 }
919
920 let reachable_nodes = reachable_nodes(graph);
921 for node_id in &node_ids {
922 if !reachable_nodes.contains(node_id) {
923 warnings.push(format!("node is unreachable: {node_id}"));
924 }
925 }
926
927 for (node_id, node) in &graph.nodes {
928 let incoming = graph
929 .edges
930 .iter()
931 .filter(|edge| edge.to == *node_id)
932 .count();
933 let outgoing: Vec<&WorkflowEdge> = graph
934 .edges
935 .iter()
936 .filter(|edge| edge.from == *node_id)
937 .collect();
938 if let Some(min_inputs) = node.input_contract.min_inputs {
939 if let Some(max_inputs) = node.input_contract.max_inputs {
940 if min_inputs > max_inputs {
941 errors.push(format!(
942 "node {node_id}: input contract min_inputs exceeds max_inputs"
943 ));
944 }
945 }
946 }
947 match node.kind.as_str() {
948 "condition" => {
949 let has_true = outgoing
950 .iter()
951 .any(|edge| edge.branch.as_deref() == Some("true"));
952 let has_false = outgoing
953 .iter()
954 .any(|edge| edge.branch.as_deref() == Some("false"));
955 if !has_true || !has_false {
956 errors.push(format!(
957 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
958 ));
959 }
960 }
961 "fork" if outgoing.len() < 2 => {
962 errors.push(format!(
963 "node {node_id}: fork nodes require at least two outgoing edges"
964 ));
965 }
966 "join" if incoming < 2 => {
967 warnings.push(format!(
968 "node {node_id}: join node has fewer than two incoming edges"
969 ));
970 }
971 "map"
972 if node.map_policy.items.is_empty()
973 && node.map_policy.item_artifact_kind.is_none()
974 && node.input_contract.input_kinds.is_empty() =>
975 {
976 errors.push(format!(
977 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
978 ));
979 }
980 "reduce" if node.input_contract.input_kinds.is_empty() => {
981 warnings.push(format!(
982 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
983 ));
984 }
985 _ => {}
986 }
987 }
988
989 if let Some(ceiling) = ceiling {
990 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
991 errors.push(error);
992 }
993 for (node_id, node) in &graph.nodes {
994 if let Err(error) = ceiling.intersect(&node.capability_policy) {
995 errors.push(format!("node {node_id}: {error}"));
996 }
997 }
998 }
999
1000 WorkflowValidationReport {
1001 valid: errors.is_empty(),
1002 errors,
1003 warnings,
1004 reachable_nodes: reachable_nodes.into_iter().collect(),
1005 }
1006}
1007
1008fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1009 let mut seen = BTreeSet::new();
1010 let mut stack = vec![graph.entry.clone()];
1011 while let Some(node_id) = stack.pop() {
1012 if !seen.insert(node_id.clone()) {
1013 continue;
1014 }
1015 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1016 stack.push(edge.to.clone());
1017 }
1018 }
1019 seen
1020}
1021
1022fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1034 let per_node = node
1035 .raw_model_policy
1036 .as_ref()
1037 .and_then(|v| v.as_dict())
1038 .and_then(|d| d.get("skills"))
1039 .cloned()
1040 .and_then(normalize_inline_registry);
1041 if per_node.is_some() {
1042 return per_node;
1043 }
1044 super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1045}
1046
1047fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1051 let per_node = node
1052 .raw_model_policy
1053 .as_ref()
1054 .and_then(|v| v.as_dict())
1055 .and_then(|d| d.get("skill_match"))
1056 .and_then(|v| v.as_dict().cloned());
1057 if let Some(dict) = per_node {
1058 return crate::llm::parse_skill_match_config_dict(&dict);
1059 }
1060 super::current_workflow_skill_context()
1061 .and_then(|ctx| ctx.match_config)
1062 .and_then(|v| v.as_dict().cloned())
1063 .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1064 .unwrap_or_default()
1065}
1066
1067fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1072 use std::collections::BTreeMap;
1073 use std::rc::Rc;
1074 match &value {
1075 VmValue::Dict(d)
1076 if d.get("_type")
1077 .map(|v| v.display() == "skill_registry")
1078 .unwrap_or(false) =>
1079 {
1080 Some(value)
1081 }
1082 VmValue::List(list) => {
1083 let mut dict = BTreeMap::new();
1084 dict.insert(
1085 "_type".to_string(),
1086 VmValue::String(Rc::from("skill_registry")),
1087 );
1088 dict.insert("skills".to_string(), VmValue::List(list.clone()));
1089 Some(VmValue::Dict(Rc::new(dict)))
1090 }
1091 _ => None,
1092 }
1093}
1094
1095fn resolve_node_session_id(node: &WorkflowNode) -> String {
1096 if let Some(explicit) = node
1097 .raw_model_policy
1098 .as_ref()
1099 .and_then(|v| v.as_dict())
1100 .and_then(|d| d.get("session_id"))
1101 .and_then(|v| match v {
1102 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1103 _ => None,
1104 })
1105 {
1106 return explicit;
1107 }
1108 format!("workflow_stage_{}", uuid::Uuid::now_v7())
1109}
1110
1111pub async fn execute_stage_node(
1112 node_id: &str,
1113 node: &WorkflowNode,
1114 task: &str,
1115 artifacts: &[ArtifactRecord],
1116) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1117 let mut selection_policy = node.context_policy.clone();
1118 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1119 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1120 }
1121 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1122 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
1123 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1124 let rendered_verification = super::render_verification_context(&verification_contracts);
1125 let stage_session_id = resolve_node_session_id(node);
1126 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1127 return Err(VmError::Runtime(format!(
1128 "workflow stage {node_id} requires an existing session \
1129 (call agent_session_open and feed session_id through model_policy \
1130 before entering this stage)"
1131 )));
1132 }
1133 if let Some(min_inputs) = node.input_contract.min_inputs {
1134 if selected.len() < min_inputs {
1135 return Err(VmError::Runtime(format!(
1136 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1137 )));
1138 }
1139 }
1140 if let Some(max_inputs) = node.input_contract.max_inputs {
1141 if selected.len() > max_inputs {
1142 return Err(VmError::Runtime(format!(
1143 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1144 )));
1145 }
1146 }
1147 let prompt = super::render_workflow_prompt(
1148 task,
1149 node.task_label.as_deref(),
1150 &rendered_verification,
1151 &rendered_context,
1152 );
1153
1154 let tool_format = node
1162 .model_policy
1163 .tool_format
1164 .clone()
1165 .filter(|value| !value.trim().is_empty())
1166 .or_else(|| {
1167 std::env::var("HARN_AGENT_TOOL_FORMAT")
1168 .ok()
1169 .filter(|value| !value.trim().is_empty())
1170 })
1171 .unwrap_or_else(|| {
1172 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1173 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1174 crate::llm_config::default_tool_format(&model, &provider)
1175 });
1176 let mut llm_result = if node.kind == "verify" {
1177 if let Some(command) = node
1178 .verify
1179 .as_ref()
1180 .and_then(|verify| verify.as_object())
1181 .and_then(|verify| verify.get("command"))
1182 .and_then(|value| value.as_str())
1183 .map(str::trim)
1184 .filter(|value| !value.is_empty())
1185 {
1186 let mut process = if cfg!(target_os = "windows") {
1187 let mut cmd = tokio::process::Command::new("cmd");
1188 cmd.arg("/C").arg(command);
1189 cmd
1190 } else {
1191 let mut cmd = tokio::process::Command::new("/bin/sh");
1192 cmd.arg("-lc").arg(command);
1193 cmd
1194 };
1195 process.stdin(std::process::Stdio::null());
1196 if let Some(context) = crate::stdlib::process::current_execution_context() {
1197 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1198 process.current_dir(cwd);
1199 }
1200 if !context.env.is_empty() {
1201 process.envs(context.env);
1202 }
1203 }
1204 let output = process
1205 .output()
1206 .await
1207 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
1208 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1209 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1210 let combined = if stderr.is_empty() {
1211 stdout.clone()
1212 } else if stdout.is_empty() {
1213 stderr.clone()
1214 } else {
1215 format!("{stdout}\n{stderr}")
1216 };
1217 serde_json::json!({
1218 "status": "completed",
1219 "text": combined,
1220 "visible_text": combined,
1221 "command": command,
1222 "stdout": stdout,
1223 "stderr": stderr,
1224 "exit_status": output.status.code().unwrap_or(-1),
1225 "success": output.status.success(),
1226 })
1227 } else {
1228 serde_json::json!({
1229 "status": "completed",
1230 "text": "",
1231 "visible_text": "",
1232 })
1233 }
1234 } else {
1235 let mut options = BTreeMap::new();
1236 if let Some(provider) = &node.model_policy.provider {
1237 options.insert(
1238 "provider".to_string(),
1239 VmValue::String(Rc::from(provider.clone())),
1240 );
1241 }
1242 if let Some(model) = &node.model_policy.model {
1243 options.insert(
1244 "model".to_string(),
1245 VmValue::String(Rc::from(model.clone())),
1246 );
1247 }
1248 if let Some(model_tier) = &node.model_policy.model_tier {
1249 options.insert(
1250 "model_tier".to_string(),
1251 VmValue::String(Rc::from(model_tier.clone())),
1252 );
1253 }
1254 if let Some(temperature) = node.model_policy.temperature {
1255 options.insert("temperature".to_string(), VmValue::Float(temperature));
1256 }
1257 if let Some(max_tokens) = node.model_policy.max_tokens {
1258 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1259 }
1260 let tool_names = workflow_tool_names(&node.tools);
1261 let tools_value = node.raw_tools.clone().or_else(|| {
1262 if matches!(node.tools, serde_json::Value::Null) {
1263 None
1264 } else {
1265 Some(crate::stdlib::json_to_vm_value(&node.tools))
1266 }
1267 });
1268 if tools_value.is_some() && !tool_names.is_empty() {
1269 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1270 }
1271 options.insert(
1272 "session_id".to_string(),
1273 VmValue::String(Rc::from(stage_session_id.clone())),
1274 );
1275
1276 let args = vec![
1277 VmValue::String(Rc::from(prompt.clone())),
1278 node.system
1279 .clone()
1280 .map(|s| VmValue::String(Rc::from(s)))
1281 .unwrap_or(VmValue::Nil),
1282 VmValue::Dict(Rc::new(options)),
1283 ];
1284 let mut opts = extract_llm_options(&args)?;
1285
1286 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1287 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1288 let effective_policy = tool_policy
1289 .intersect(&node.capability_policy)
1290 .map_err(VmError::Runtime)?;
1291 let auto_compact = if node.auto_compact.enabled {
1292 let mut ac = crate::orchestration::AutoCompactConfig::default();
1293 if let Some(v) = node.auto_compact.token_threshold {
1294 ac.token_threshold = v;
1295 }
1296 if let Some(v) = node.auto_compact.tool_output_max_chars {
1297 ac.tool_output_max_chars = v;
1298 }
1299 if let Some(ref strategy) = node.auto_compact.compact_strategy {
1300 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1301 ac.compact_strategy = s;
1302 }
1303 }
1304 if let Some(v) = node.auto_compact.hard_limit_tokens {
1305 ac.hard_limit_tokens = Some(v);
1306 }
1307 if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1308 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1309 ac.hard_limit_strategy = s;
1310 }
1311 }
1312 if let Some(ref raw_ac) = node.raw_auto_compact {
1315 if let Some(dict) = raw_ac.as_dict() {
1316 if let Some(cb) = dict.get("compress_callback") {
1317 ac.compress_callback = Some(cb.clone());
1318 }
1319 if let Some(cb) = dict.get("mask_callback") {
1320 ac.mask_callback = Some(cb.clone());
1321 }
1322 if let Some(cb) = dict.get("custom_compactor") {
1323 ac.custom_compactor = Some(cb.clone());
1324 }
1325 }
1326 }
1327 {
1328 let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1329 let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1330 crate::llm::api::adapt_auto_compact_to_provider(
1331 &mut ac,
1332 user_specified_threshold,
1333 user_specified_hard_limit,
1334 &opts.provider,
1335 &opts.model,
1336 &opts.api_key,
1337 )
1338 .await;
1339 }
1340 Some(ac)
1341 } else {
1342 None
1343 };
1344 crate::llm::run_agent_loop_internal(
1345 &mut opts,
1346 crate::llm::AgentLoopConfig {
1347 persistent: true,
1348 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1349 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1350 nudge: node.model_policy.nudge.clone(),
1351 done_sentinel: node.done_sentinel.clone(),
1352 break_unless_phase: None,
1353 tool_retries: 0,
1354 tool_backoff_ms: 1000,
1355 tool_format: tool_format.clone(),
1356 auto_compact,
1357 policy: Some(effective_policy),
1358 approval_policy: Some(node.approval_policy.clone()),
1359 daemon: false,
1360 daemon_config: Default::default(),
1361 llm_retries: 2,
1362 llm_backoff_ms: 2000,
1363 token_budget: None,
1364 exit_when_verified: node.exit_when_verified,
1365 loop_detect_warn: 2,
1366 loop_detect_block: 3,
1367 loop_detect_skip: 4,
1368 tool_examples: node.model_policy.tool_examples.clone(),
1369 turn_policy: node.model_policy.turn_policy.clone(),
1370 stop_after_successful_tools: node
1371 .model_policy
1372 .stop_after_successful_tools
1373 .clone(),
1374 require_successful_tools: node.model_policy.require_successful_tools.clone(),
1375 session_id: stage_session_id.clone(),
1379 event_sink: None,
1380 task_ledger: node
1384 .raw_model_policy
1385 .as_ref()
1386 .and_then(|v| v.as_dict())
1387 .and_then(|d| d.get("task_ledger"))
1388 .map(crate::llm::helpers::vm_value_to_json)
1389 .and_then(|json| serde_json::from_value(json).ok())
1390 .unwrap_or_default(),
1391 post_turn_callback: node
1392 .raw_model_policy
1393 .as_ref()
1394 .and_then(|v| v.as_dict())
1395 .and_then(|d| d.get("post_turn_callback"))
1396 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1397 .cloned(),
1398 skill_registry: resolve_stage_skill_registry(node),
1405 skill_match: resolve_stage_skill_match(node),
1406 working_files: Vec::new(),
1407 },
1408 )
1409 .await?
1410 } else {
1411 let result = vm_call_llm_full(&opts).await?;
1412 crate::llm::agent_loop_result_from_llm(&result, opts)
1413 }
1414 };
1415 if let Some(payload) = llm_result.as_object_mut() {
1416 payload.insert("prompt".to_string(), serde_json::json!(prompt));
1417 payload.insert(
1418 "system_prompt".to_string(),
1419 serde_json::json!(node.system.clone().unwrap_or_default()),
1420 );
1421 payload.insert(
1422 "rendered_context".to_string(),
1423 serde_json::json!(rendered_context),
1424 );
1425 if !verification_contracts.is_empty() {
1426 payload.insert(
1427 "verification_contracts".to_string(),
1428 serde_json::to_value(&verification_contracts).unwrap_or_default(),
1429 );
1430 payload.insert(
1431 "rendered_verification_context".to_string(),
1432 serde_json::json!(rendered_verification),
1433 );
1434 }
1435 payload.insert(
1436 "selected_artifact_ids".to_string(),
1437 serde_json::json!(selected
1438 .iter()
1439 .map(|artifact| artifact.id.clone())
1440 .collect::<Vec<_>>()),
1441 );
1442 payload.insert(
1443 "selected_artifact_titles".to_string(),
1444 serde_json::json!(selected
1445 .iter()
1446 .map(|artifact| artifact.title.clone())
1447 .collect::<Vec<_>>()),
1448 );
1449 payload.insert(
1450 "tool_calling_mode".to_string(),
1451 serde_json::json!(tool_format.clone()),
1452 );
1453 }
1454
1455 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1456 let result_transcript = llm_result
1460 .get("transcript")
1461 .cloned()
1462 .map(|value| crate::stdlib::json_to_vm_value(&value));
1463 let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1464 let transcript = result_transcript
1465 .or(session_transcript)
1466 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1467 let output_kind = node
1468 .output_contract
1469 .output_kinds
1470 .first()
1471 .cloned()
1472 .unwrap_or_else(|| {
1473 if node.kind == "verify" {
1474 "verification_result".to_string()
1475 } else {
1476 "artifact".to_string()
1477 }
1478 });
1479 let mut metadata = BTreeMap::new();
1480 metadata.insert(
1481 "input_artifact_ids".to_string(),
1482 serde_json::json!(selected
1483 .iter()
1484 .map(|artifact| artifact.id.clone())
1485 .collect::<Vec<_>>()),
1486 );
1487 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1488 if !node.approval_policy.write_path_allowlist.is_empty() {
1489 metadata.insert(
1490 "changed_paths".to_string(),
1491 serde_json::json!(node.approval_policy.write_path_allowlist),
1492 );
1493 }
1494 let artifact = ArtifactRecord {
1495 type_name: "artifact".to_string(),
1496 id: new_id("artifact"),
1497 kind: output_kind,
1498 title: Some(format!("stage {node_id} output")),
1499 text: Some(visible_text),
1500 data: Some(llm_result.clone()),
1501 source: Some(node_id.to_string()),
1502 created_at: now_rfc3339(),
1503 freshness: Some("fresh".to_string()),
1504 priority: None,
1505 lineage: selected
1506 .iter()
1507 .map(|artifact| artifact.id.clone())
1508 .collect(),
1509 relevance: Some(1.0),
1510 estimated_tokens: None,
1511 stage: Some(node_id.to_string()),
1512 metadata,
1513 }
1514 .normalize();
1515
1516 Ok((llm_result, vec![artifact], transcript))
1517}
1518
1519pub fn next_nodes_for(
1520 graph: &WorkflowGraph,
1521 current: &str,
1522 branch: Option<&str>,
1523) -> Vec<WorkflowEdge> {
1524 let mut matching: Vec<WorkflowEdge> = graph
1525 .edges
1526 .iter()
1527 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1528 .cloned()
1529 .collect();
1530 if matching.is_empty() {
1531 matching = graph
1532 .edges
1533 .iter()
1534 .filter(|edge| edge.from == current && edge.branch.is_none())
1535 .cloned()
1536 .collect();
1537 }
1538 matching
1539}
1540
1541pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1542 next_nodes_for(graph, current, Some(branch))
1543 .into_iter()
1544 .next()
1545 .map(|edge| edge.to)
1546}
1547
1548pub fn append_audit_entry(
1549 graph: &mut WorkflowGraph,
1550 op: &str,
1551 node_id: Option<String>,
1552 reason: Option<String>,
1553 metadata: BTreeMap<String, serde_json::Value>,
1554) {
1555 graph.audit_log.push(WorkflowAuditEntry {
1556 id: new_id("audit"),
1557 op: op.to_string(),
1558 node_id,
1559 timestamp: now_rfc3339(),
1560 reason,
1561 metadata,
1562 });
1563}