1use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11 ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18
19#[derive(Clone, Debug, Default, Serialize, Deserialize)]
20#[serde(default)]
21pub struct WorkflowNode {
22 pub id: Option<String>,
23 pub kind: String,
24 pub mode: Option<String>,
25 pub prompt: Option<String>,
26 pub system: Option<String>,
27 pub task_label: Option<String>,
28 pub done_sentinel: Option<String>,
29 pub tools: serde_json::Value,
30 pub model_policy: ModelPolicy,
31 pub auto_compact: AutoCompactPolicy,
36 #[serde(default)]
41 pub output_visibility: Option<String>,
42 pub context_policy: ContextPolicy,
43 pub retry_policy: RetryPolicy,
44 pub capability_policy: CapabilityPolicy,
45 pub approval_policy: super::ToolApprovalPolicy,
46 pub input_contract: StageContract,
47 pub output_contract: StageContract,
48 pub branch_semantics: BranchSemantics,
49 pub map_policy: MapPolicy,
50 pub join_policy: JoinPolicy,
51 pub reduce_policy: ReducePolicy,
52 pub escalation_policy: EscalationPolicy,
53 pub verify: Option<serde_json::Value>,
54 #[serde(default)]
59 pub exit_when_verified: bool,
60 pub metadata: BTreeMap<String, serde_json::Value>,
61 #[serde(skip)]
62 pub raw_tools: Option<VmValue>,
63 #[serde(skip)]
67 pub raw_auto_compact: Option<VmValue>,
68 #[serde(skip)]
71 pub raw_model_policy: Option<VmValue>,
72}
73
74impl PartialEq for WorkflowNode {
75 fn eq(&self, other: &Self) -> bool {
76 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
77 }
78}
79
80#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
81#[serde(default)]
82pub struct VerificationRequirement {
83 pub kind: String,
84 pub value: String,
85 pub note: Option<String>,
86}
87
88#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
89#[serde(default)]
90pub struct VerificationContract {
91 pub source_node: Option<String>,
92 pub summary: Option<String>,
93 pub command: Option<String>,
94 pub expect_status: Option<i64>,
95 pub assert_text: Option<String>,
96 pub expect_text: Option<String>,
97 pub required_identifiers: Vec<String>,
98 pub required_paths: Vec<String>,
99 pub required_text: Vec<String>,
100 pub notes: Vec<String>,
101 pub checks: Vec<VerificationRequirement>,
102}
103
104impl VerificationContract {
105 fn is_empty(&self) -> bool {
106 self.summary.is_none()
107 && self.command.is_none()
108 && self.expect_status.is_none()
109 && self.assert_text.is_none()
110 && self.expect_text.is_none()
111 && self.required_identifiers.is_empty()
112 && self.required_paths.is_empty()
113 && self.required_text.is_empty()
114 && self.notes.is_empty()
115 && self.checks.is_empty()
116 }
117}
118
119fn push_unique_string(values: &mut Vec<String>, value: &str) {
120 let trimmed = value.trim();
121 if trimmed.is_empty() {
122 return;
123 }
124 if !values.iter().any(|existing| existing == trimmed) {
125 values.push(trimmed.to_string());
126 }
127}
128
129fn push_unique_requirement(
130 values: &mut Vec<VerificationRequirement>,
131 kind: &str,
132 value: &str,
133 note: Option<&str>,
134) {
135 let trimmed_kind = kind.trim();
136 let trimmed_value = value.trim();
137 let trimmed_note = note
138 .map(str::trim)
139 .filter(|candidate| !candidate.is_empty())
140 .map(|candidate| candidate.to_string());
141 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
142 return;
143 }
144 let candidate = VerificationRequirement {
145 kind: trimmed_kind.to_string(),
146 value: trimmed_value.to_string(),
147 note: trimmed_note,
148 };
149 if !values.iter().any(|existing| existing == &candidate) {
150 values.push(candidate);
151 }
152}
153
154fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
155 match value {
156 Some(serde_json::Value::String(text)) => {
157 let mut values = Vec::new();
158 push_unique_string(&mut values, text);
159 values
160 }
161 Some(serde_json::Value::Array(items)) => {
162 let mut values = Vec::new();
163 for item in items {
164 if let Some(text) = item.as_str() {
165 push_unique_string(&mut values, text);
166 }
167 }
168 values
169 }
170 _ => Vec::new(),
171 }
172}
173
174fn merge_verification_requirement_list(
175 target: &mut Vec<VerificationRequirement>,
176 value: Option<&serde_json::Value>,
177) {
178 let Some(items) = value.and_then(|raw| raw.as_array()) else {
179 return;
180 };
181 for item in items {
182 let Some(object) = item.as_object() else {
183 continue;
184 };
185 let kind = object
186 .get("kind")
187 .and_then(|value| value.as_str())
188 .unwrap_or_default();
189 let value = object
190 .get("value")
191 .and_then(|value| value.as_str())
192 .unwrap_or_default();
193 let note = object
194 .get("note")
195 .or_else(|| object.get("description"))
196 .or_else(|| object.get("reason"))
197 .and_then(|value| value.as_str());
198 push_unique_requirement(target, kind, value, note);
199 }
200}
201
202fn merge_verification_contract_fields(
203 target: &mut VerificationContract,
204 object: &serde_json::Map<String, serde_json::Value>,
205) {
206 if target.summary.is_none() {
207 target.summary = object
208 .get("summary")
209 .and_then(|value| value.as_str())
210 .map(str::trim)
211 .filter(|value| !value.is_empty())
212 .map(|value| value.to_string());
213 }
214 if target.command.is_none() {
215 target.command = object
216 .get("command")
217 .and_then(|value| value.as_str())
218 .map(str::trim)
219 .filter(|value| !value.is_empty())
220 .map(|value| value.to_string());
221 }
222 if target.expect_status.is_none() {
223 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
224 }
225 if target.assert_text.is_none() {
226 target.assert_text = object
227 .get("assert_text")
228 .and_then(|value| value.as_str())
229 .map(str::trim)
230 .filter(|value| !value.is_empty())
231 .map(|value| value.to_string());
232 }
233 if target.expect_text.is_none() {
234 target.expect_text = object
235 .get("expect_text")
236 .and_then(|value| value.as_str())
237 .map(str::trim)
238 .filter(|value| !value.is_empty())
239 .map(|value| value.to_string());
240 }
241
242 for value in json_string_list(
243 object
244 .get("required_identifiers")
245 .or_else(|| object.get("identifiers")),
246 ) {
247 push_unique_string(&mut target.required_identifiers, &value);
248 }
249 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
250 push_unique_string(&mut target.required_paths, &value);
251 }
252 for value in json_string_list(
253 object
254 .get("required_text")
255 .or_else(|| object.get("exact_text"))
256 .or_else(|| object.get("required_strings")),
257 ) {
258 push_unique_string(&mut target.required_text, &value);
259 }
260 for value in json_string_list(object.get("notes")) {
261 push_unique_string(&mut target.notes, &value);
262 }
263 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
264}
265
266fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
267 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
268 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
269 VmError::Runtime(format!(
270 "workflow verification contract read failed for {}: {error}",
271 resolved.display()
272 ))
273 })?;
274 serde_json::from_str(&contents).map_err(|error| {
275 VmError::Runtime(format!(
276 "workflow verification contract parse failed for {}: {error}",
277 resolved.display()
278 ))
279 })
280}
281
282fn resolve_verification_contract_path(
283 verify: &serde_json::Map<String, serde_json::Value>,
284) -> Result<Option<serde_json::Value>, VmError> {
285 let Some(path) = verify
286 .get("contract_path")
287 .or_else(|| verify.get("verification_contract_path"))
288 .and_then(|value| value.as_str())
289 .map(str::trim)
290 .filter(|value| !value.is_empty())
291 else {
292 return Ok(None);
293 };
294 Ok(Some(load_verification_contract_file(path)?))
295}
296
297pub fn verification_contract_from_verify(
298 node_id: &str,
299 verify: Option<&serde_json::Value>,
300) -> Result<Option<VerificationContract>, VmError> {
301 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
302 return Ok(None);
303 };
304
305 let mut contract = VerificationContract {
306 source_node: Some(node_id.to_string()),
307 ..Default::default()
308 };
309
310 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
311 let Some(object) = file_contract.as_object() else {
312 return Err(VmError::Runtime(
313 "workflow verification contract file must parse to a JSON object".to_string(),
314 ));
315 };
316 merge_verification_contract_fields(&mut contract, object);
317 }
318
319 if let Some(inline_contract) = verify_object.get("contract") {
320 let Some(object) = inline_contract.as_object() else {
321 return Err(VmError::Runtime(
322 "workflow verify.contract must be an object".to_string(),
323 ));
324 };
325 merge_verification_contract_fields(&mut contract, object);
326 }
327
328 merge_verification_contract_fields(&mut contract, verify_object);
329
330 if let Some(assert_text) = contract.assert_text.clone() {
331 push_unique_requirement(
332 &mut contract.checks,
333 "visible_text_contains",
334 &assert_text,
335 Some("verify stage requires visible output to contain this text"),
336 );
337 }
338 if let Some(expect_text) = contract.expect_text.clone() {
339 push_unique_requirement(
340 &mut contract.checks,
341 "combined_output_contains",
342 &expect_text,
343 Some("verify command requires combined stdout/stderr to contain this text"),
344 );
345 }
346 if let Some(expect_status) = contract.expect_status {
347 push_unique_requirement(
348 &mut contract.checks,
349 "expect_status",
350 &expect_status.to_string(),
351 Some("verify command exit status must match exactly"),
352 );
353 }
354 for identifier in contract.required_identifiers.clone() {
355 push_unique_requirement(
356 &mut contract.checks,
357 "identifier",
358 &identifier,
359 Some("use this exact identifier spelling"),
360 );
361 }
362 for path in contract.required_paths.clone() {
363 push_unique_requirement(
364 &mut contract.checks,
365 "path",
366 &path,
367 Some("preserve this exact path"),
368 );
369 }
370 for text in contract.required_text.clone() {
371 push_unique_requirement(
372 &mut contract.checks,
373 "text",
374 &text,
375 Some("required exact text or wiring snippet"),
376 );
377 }
378
379 if contract.is_empty() {
380 return Ok(None);
381 }
382 Ok(Some(contract))
383}
384
385fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
386 if !values.iter().any(|existing| existing == &candidate) {
387 values.push(candidate);
388 }
389}
390
391pub fn workflow_verification_contracts(
392 graph: &WorkflowGraph,
393) -> Result<Vec<VerificationContract>, VmError> {
394 let mut contracts = Vec::new();
395 for (node_id, node) in &graph.nodes {
396 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
397 push_unique_contract(&mut contracts, contract);
398 }
399 }
400 Ok(contracts)
401}
402
403pub fn inject_workflow_verification_contracts(
404 node: &mut WorkflowNode,
405 contracts: &[VerificationContract],
406) {
407 if contracts.is_empty() {
408 return;
409 }
410 node.metadata.insert(
411 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
412 serde_json::to_value(contracts).unwrap_or_default(),
413 );
414}
415
416pub fn stage_verification_contracts(
417 node_id: &str,
418 node: &WorkflowNode,
419) -> Result<Vec<VerificationContract>, VmError> {
420 let mut contracts = node
421 .metadata
422 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
423 .cloned()
424 .map(|value| {
425 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
426 VmError::Runtime(format!(
427 "workflow stage {node_id} verification contract metadata parse failed: {error}"
428 ))
429 })
430 })
431 .transpose()?
432 .unwrap_or_default();
433
434 if let Some(local_contract) = verification_contract_from_verify(node_id, node.verify.as_ref())?
435 {
436 push_unique_contract(&mut contracts, local_contract);
437 }
438 Ok(contracts)
439}
440
441pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
442 match value {
443 serde_json::Value::Null => Vec::new(),
444 serde_json::Value::Array(items) => items
445 .iter()
446 .filter_map(|item| match item {
447 serde_json::Value::Object(map) => map
448 .get("name")
449 .and_then(|value| value.as_str())
450 .filter(|name| !name.is_empty())
451 .map(|name| name.to_string()),
452 _ => None,
453 })
454 .collect(),
455 serde_json::Value::Object(map) => {
456 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
457 return map
458 .get("tools")
459 .map(workflow_tool_names)
460 .unwrap_or_default();
461 }
462 map.get("name")
463 .and_then(|value| value.as_str())
464 .filter(|name| !name.is_empty())
465 .map(|name| vec![name.to_string()])
466 .unwrap_or_default()
467 }
468 _ => Vec::new(),
469 }
470}
471
472fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
473 fn rank(v: &str) -> usize {
474 match v {
475 "none" => 0,
476 "read_only" => 1,
477 "workspace_write" => 2,
478 "process_exec" => 3,
479 "network" => 4,
480 _ => 5,
481 }
482 }
483 levels.max_by_key(|level| rank(level))
484}
485
486fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
487 match value.and_then(|v| v.as_str()).unwrap_or("") {
488 "read" => ToolKind::Read,
489 "edit" => ToolKind::Edit,
490 "delete" => ToolKind::Delete,
491 "move" => ToolKind::Move,
492 "search" => ToolKind::Search,
493 "execute" => ToolKind::Execute,
494 "think" => ToolKind::Think,
495 "fetch" => ToolKind::Fetch,
496 _ => ToolKind::Other,
497 }
498}
499
500fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
501 let policy = map
502 .get("policy")
503 .and_then(|value| value.as_object())
504 .cloned()
505 .unwrap_or_default();
506
507 let capabilities = policy
508 .get("capabilities")
509 .and_then(|value| value.as_object())
510 .map(|caps| {
511 caps.iter()
512 .map(|(capability, ops)| {
513 let values = ops
514 .as_array()
515 .map(|items| {
516 items
517 .iter()
518 .filter_map(|item| item.as_str().map(|s| s.to_string()))
519 .collect::<Vec<_>>()
520 })
521 .unwrap_or_default();
522 (capability.clone(), values)
523 })
524 .collect::<BTreeMap<_, _>>()
525 })
526 .unwrap_or_default();
527
528 let arg_schema = if let Some(schema) = policy.get("arg_schema") {
531 serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
532 } else {
533 ToolArgSchema {
534 path_params: policy
535 .get("path_params")
536 .and_then(|value| value.as_array())
537 .map(|items| {
538 items
539 .iter()
540 .filter_map(|item| item.as_str().map(|s| s.to_string()))
541 .collect::<Vec<_>>()
542 })
543 .unwrap_or_default(),
544 arg_aliases: policy
545 .get("arg_aliases")
546 .and_then(|value| value.as_object())
547 .map(|aliases| {
548 aliases
549 .iter()
550 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
551 .collect::<BTreeMap<_, _>>()
552 })
553 .unwrap_or_default(),
554 required: policy
555 .get("required")
556 .and_then(|value| value.as_array())
557 .map(|items| {
558 items
559 .iter()
560 .filter_map(|item| item.as_str().map(|s| s.to_string()))
561 .collect::<Vec<_>>()
562 })
563 .unwrap_or_default(),
564 }
565 };
566
567 let kind = parse_tool_kind(policy.get("kind"));
568 let side_effect_level = policy
569 .get("side_effect_level")
570 .and_then(|value| value.as_str())
571 .map(SideEffectLevel::parse)
572 .unwrap_or_default();
573
574 ToolAnnotations {
575 kind,
576 side_effect_level,
577 arg_schema,
578 capabilities,
579 }
580}
581
582pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
583 match value {
584 serde_json::Value::Null => BTreeMap::new(),
585 serde_json::Value::Array(items) => items
586 .iter()
587 .filter_map(|item| match item {
588 serde_json::Value::Object(map) => map
589 .get("name")
590 .and_then(|value| value.as_str())
591 .filter(|name| !name.is_empty())
592 .map(|name| (name.to_string(), parse_tool_annotations(map))),
593 _ => None,
594 })
595 .collect(),
596 serde_json::Value::Object(map) => {
597 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
598 return map
599 .get("tools")
600 .map(workflow_tool_annotations)
601 .unwrap_or_default();
602 }
603 map.get("name")
604 .and_then(|value| value.as_str())
605 .filter(|name| !name.is_empty())
606 .map(|name| {
607 let mut annotations = BTreeMap::new();
608 annotations.insert(name.to_string(), parse_tool_annotations(map));
609 annotations
610 })
611 .unwrap_or_default()
612 }
613 _ => BTreeMap::new(),
614 }
615}
616
617pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
618 let tools = workflow_tool_names(value);
619 let tool_annotations = workflow_tool_annotations(value);
620 let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
621 for annotations in tool_annotations.values() {
622 for (capability, ops) in &annotations.capabilities {
623 let entry = capabilities.entry(capability.clone()).or_default();
624 for op in ops {
625 if !entry.contains(op) {
626 entry.push(op.clone());
627 }
628 }
629 entry.sort();
630 }
631 }
632 let side_effect_level = max_side_effect_level(
633 tool_annotations
634 .values()
635 .map(|annotations| annotations.side_effect_level.as_str().to_string())
636 .filter(|level| level != "none"),
637 );
638 CapabilityPolicy {
639 tools,
640 capabilities,
641 workspace_roots: Vec::new(),
642 side_effect_level,
643 recursion_limit: None,
644 tool_arg_constraints: Vec::new(),
645 tool_annotations,
646 }
647}
648
649#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
650#[serde(default)]
651pub struct WorkflowEdge {
652 pub from: String,
653 pub to: String,
654 pub branch: Option<String>,
655 pub label: Option<String>,
656}
657
658#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
659#[serde(default)]
660pub struct WorkflowGraph {
661 #[serde(rename = "_type")]
662 pub type_name: String,
663 pub id: String,
664 pub name: Option<String>,
665 pub version: usize,
666 pub entry: String,
667 pub nodes: BTreeMap<String, WorkflowNode>,
668 pub edges: Vec<WorkflowEdge>,
669 pub capability_policy: CapabilityPolicy,
670 pub approval_policy: super::ToolApprovalPolicy,
671 pub metadata: BTreeMap<String, serde_json::Value>,
672 pub audit_log: Vec<WorkflowAuditEntry>,
673}
674
675#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
676#[serde(default)]
677pub struct WorkflowAuditEntry {
678 pub id: String,
679 pub op: String,
680 pub node_id: Option<String>,
681 pub timestamp: String,
682 pub reason: Option<String>,
683 pub metadata: BTreeMap<String, serde_json::Value>,
684}
685
686#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
687#[serde(default)]
688pub struct WorkflowValidationReport {
689 pub valid: bool,
690 pub errors: Vec<String>,
691 pub warnings: Vec<String>,
692 pub reachable_nodes: Vec<String>,
693}
694
695pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
696 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
697 let dict = value.as_dict();
698 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
699 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
700 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
701 Ok(node)
702}
703
704pub fn parse_workflow_node_json(
705 json: serde_json::Value,
706 label: &str,
707) -> Result<WorkflowNode, VmError> {
708 super::parse_json_payload(json, label)
709}
710
711pub fn parse_workflow_edge_json(
712 json: serde_json::Value,
713 label: &str,
714) -> Result<WorkflowEdge, VmError> {
715 super::parse_json_payload(json, label)
716}
717
718pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
719 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
720 let as_dict = value.as_dict().cloned().unwrap_or_default();
721
722 if graph.nodes.is_empty() {
723 for key in ["act", "verify", "repair"] {
724 if let Some(node_value) = as_dict.get(key) {
725 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
726 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
727 node.id = Some(key.to_string());
728 if node.kind.is_empty() {
729 node.kind = if key == "verify" {
730 "verify".to_string()
731 } else {
732 "stage".to_string()
733 };
734 }
735 if node.model_policy.provider.is_none() {
736 node.model_policy.provider = as_dict
737 .get("provider")
738 .map(|value| value.display())
739 .filter(|value| !value.is_empty());
740 }
741 if node.model_policy.model.is_none() {
742 node.model_policy.model = as_dict
743 .get("model")
744 .map(|value| value.display())
745 .filter(|value| !value.is_empty());
746 }
747 if node.model_policy.model_tier.is_none() {
748 node.model_policy.model_tier = as_dict
749 .get("model_tier")
750 .or_else(|| as_dict.get("tier"))
751 .map(|value| value.display())
752 .filter(|value| !value.is_empty());
753 }
754 if node.model_policy.temperature.is_none() {
755 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
756 if let VmValue::Float(number) = value {
757 Some(*number)
758 } else {
759 value.as_int().map(|number| number as f64)
760 }
761 });
762 }
763 if node.model_policy.max_tokens.is_none() {
764 node.model_policy.max_tokens =
765 as_dict.get("max_tokens").and_then(|value| value.as_int());
766 }
767 if node.mode.is_none() {
768 node.mode = as_dict
769 .get("mode")
770 .map(|value| value.display())
771 .filter(|value| !value.is_empty());
772 }
773 if node.done_sentinel.is_none() {
774 node.done_sentinel = as_dict
775 .get("done_sentinel")
776 .map(|value| value.display())
777 .filter(|value| !value.is_empty());
778 }
779 if key == "verify"
780 && node.verify.is_none()
781 && (raw_node.contains_key("assert_text")
782 || raw_node.contains_key("command")
783 || raw_node.contains_key("expect_status")
784 || raw_node.contains_key("expect_text"))
785 {
786 node.verify = Some(serde_json::json!({
787 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
788 "command": raw_node.get("command").map(vm_value_to_json),
789 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
790 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
791 }));
792 }
793 graph.nodes.insert(key.to_string(), node);
794 }
795 }
796 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
797 graph.entry = "act".to_string();
798 }
799 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
800 if graph.nodes.contains_key("verify") {
801 graph.edges.push(WorkflowEdge {
802 from: "act".to_string(),
803 to: "verify".to_string(),
804 branch: None,
805 label: None,
806 });
807 }
808 if graph.nodes.contains_key("repair") {
809 graph.edges.push(WorkflowEdge {
810 from: "verify".to_string(),
811 to: "repair".to_string(),
812 branch: Some("failed".to_string()),
813 label: None,
814 });
815 graph.edges.push(WorkflowEdge {
816 from: "repair".to_string(),
817 to: "verify".to_string(),
818 branch: Some("retry".to_string()),
819 label: None,
820 });
821 }
822 }
823 }
824
825 if graph.type_name.is_empty() {
826 graph.type_name = "workflow_graph".to_string();
827 }
828 if graph.id.is_empty() {
829 graph.id = new_id("workflow");
830 }
831 if graph.version == 0 {
832 graph.version = 1;
833 }
834 if graph.entry.is_empty() {
835 graph.entry = graph
836 .nodes
837 .keys()
838 .next()
839 .cloned()
840 .unwrap_or_else(|| "act".to_string());
841 }
842 for (node_id, node) in &mut graph.nodes {
843 if node.raw_tools.is_none() {
844 node.raw_tools = as_dict
845 .get("nodes")
846 .and_then(|nodes| nodes.as_dict())
847 .and_then(|nodes| nodes.get(node_id))
848 .and_then(|node_value| node_value.as_dict())
849 .and_then(|raw_node| raw_node.get("tools"))
850 .cloned();
851 }
852 if node.id.is_none() {
853 node.id = Some(node_id.clone());
854 }
855 if node.kind.is_empty() {
856 node.kind = "stage".to_string();
857 }
858 if node.join_policy.strategy.is_empty() {
859 node.join_policy.strategy = "all".to_string();
860 }
861 if node.reduce_policy.strategy.is_empty() {
862 node.reduce_policy.strategy = "concat".to_string();
863 }
864 if node.output_contract.output_kinds.is_empty() {
865 node.output_contract.output_kinds = vec![match node.kind.as_str() {
866 "verify" => "verification_result".to_string(),
867 "reduce" => node
868 .reduce_policy
869 .output_kind
870 .clone()
871 .unwrap_or_else(|| "summary".to_string()),
872 "map" => node
873 .map_policy
874 .output_kind
875 .clone()
876 .unwrap_or_else(|| "artifact".to_string()),
877 "escalation" => "plan".to_string(),
878 _ => "artifact".to_string(),
879 }];
880 }
881 if node.retry_policy.max_attempts == 0 {
882 node.retry_policy.max_attempts = 1;
883 }
884 }
885 Ok(graph)
886}
887
888pub fn validate_workflow(
889 graph: &WorkflowGraph,
890 ceiling: Option<&CapabilityPolicy>,
891) -> WorkflowValidationReport {
892 let mut errors = Vec::new();
893 let mut warnings = Vec::new();
894
895 if !graph.nodes.contains_key(&graph.entry) {
896 errors.push(format!("entry node does not exist: {}", graph.entry));
897 }
898
899 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
900 for edge in &graph.edges {
901 if !node_ids.contains(&edge.from) {
902 errors.push(format!("edge.from references unknown node: {}", edge.from));
903 }
904 if !node_ids.contains(&edge.to) {
905 errors.push(format!("edge.to references unknown node: {}", edge.to));
906 }
907 }
908
909 let reachable_nodes = reachable_nodes(graph);
910 for node_id in &node_ids {
911 if !reachable_nodes.contains(node_id) {
912 warnings.push(format!("node is unreachable: {node_id}"));
913 }
914 }
915
916 for (node_id, node) in &graph.nodes {
917 let incoming = graph
918 .edges
919 .iter()
920 .filter(|edge| edge.to == *node_id)
921 .count();
922 let outgoing: Vec<&WorkflowEdge> = graph
923 .edges
924 .iter()
925 .filter(|edge| edge.from == *node_id)
926 .collect();
927 if let Some(min_inputs) = node.input_contract.min_inputs {
928 if let Some(max_inputs) = node.input_contract.max_inputs {
929 if min_inputs > max_inputs {
930 errors.push(format!(
931 "node {node_id}: input contract min_inputs exceeds max_inputs"
932 ));
933 }
934 }
935 }
936 match node.kind.as_str() {
937 "condition" => {
938 let has_true = outgoing
939 .iter()
940 .any(|edge| edge.branch.as_deref() == Some("true"));
941 let has_false = outgoing
942 .iter()
943 .any(|edge| edge.branch.as_deref() == Some("false"));
944 if !has_true || !has_false {
945 errors.push(format!(
946 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
947 ));
948 }
949 }
950 "fork" if outgoing.len() < 2 => {
951 errors.push(format!(
952 "node {node_id}: fork nodes require at least two outgoing edges"
953 ));
954 }
955 "join" if incoming < 2 => {
956 warnings.push(format!(
957 "node {node_id}: join node has fewer than two incoming edges"
958 ));
959 }
960 "map"
961 if node.map_policy.items.is_empty()
962 && node.map_policy.item_artifact_kind.is_none()
963 && node.input_contract.input_kinds.is_empty() =>
964 {
965 errors.push(format!(
966 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
967 ));
968 }
969 "reduce" if node.input_contract.input_kinds.is_empty() => {
970 warnings.push(format!(
971 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
972 ));
973 }
974 _ => {}
975 }
976 }
977
978 if let Some(ceiling) = ceiling {
979 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
980 errors.push(error);
981 }
982 for (node_id, node) in &graph.nodes {
983 if let Err(error) = ceiling.intersect(&node.capability_policy) {
984 errors.push(format!("node {node_id}: {error}"));
985 }
986 }
987 }
988
989 WorkflowValidationReport {
990 valid: errors.is_empty(),
991 errors,
992 warnings,
993 reachable_nodes: reachable_nodes.into_iter().collect(),
994 }
995}
996
997fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
998 let mut seen = BTreeSet::new();
999 let mut stack = vec![graph.entry.clone()];
1000 while let Some(node_id) = stack.pop() {
1001 if !seen.insert(node_id.clone()) {
1002 continue;
1003 }
1004 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1005 stack.push(edge.to.clone());
1006 }
1007 }
1008 seen
1009}
1010
1011fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1023 let per_node = node
1024 .raw_model_policy
1025 .as_ref()
1026 .and_then(|v| v.as_dict())
1027 .and_then(|d| d.get("skills"))
1028 .cloned()
1029 .and_then(normalize_inline_registry);
1030 if per_node.is_some() {
1031 return per_node;
1032 }
1033 super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1034}
1035
1036fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1040 let per_node = node
1041 .raw_model_policy
1042 .as_ref()
1043 .and_then(|v| v.as_dict())
1044 .and_then(|d| d.get("skill_match"))
1045 .and_then(|v| v.as_dict().cloned());
1046 if let Some(dict) = per_node {
1047 return crate::llm::parse_skill_match_config_dict(&dict);
1048 }
1049 super::current_workflow_skill_context()
1050 .and_then(|ctx| ctx.match_config)
1051 .and_then(|v| v.as_dict().cloned())
1052 .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1053 .unwrap_or_default()
1054}
1055
1056fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1061 use std::collections::BTreeMap;
1062 use std::rc::Rc;
1063 match &value {
1064 VmValue::Dict(d)
1065 if d.get("_type")
1066 .map(|v| v.display() == "skill_registry")
1067 .unwrap_or(false) =>
1068 {
1069 Some(value)
1070 }
1071 VmValue::List(list) => {
1072 let mut dict = BTreeMap::new();
1073 dict.insert(
1074 "_type".to_string(),
1075 VmValue::String(Rc::from("skill_registry")),
1076 );
1077 dict.insert("skills".to_string(), VmValue::List(list.clone()));
1078 Some(VmValue::Dict(Rc::new(dict)))
1079 }
1080 _ => None,
1081 }
1082}
1083
1084fn resolve_node_session_id(node: &WorkflowNode) -> String {
1085 if let Some(explicit) = node
1086 .raw_model_policy
1087 .as_ref()
1088 .and_then(|v| v.as_dict())
1089 .and_then(|d| d.get("session_id"))
1090 .and_then(|v| match v {
1091 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1092 _ => None,
1093 })
1094 {
1095 return explicit;
1096 }
1097 format!("workflow_stage_{}", uuid::Uuid::now_v7())
1098}
1099
1100pub async fn execute_stage_node(
1101 node_id: &str,
1102 node: &WorkflowNode,
1103 task: &str,
1104 artifacts: &[ArtifactRecord],
1105) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1106 let mut selection_policy = node.context_policy.clone();
1107 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1108 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1109 }
1110 let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1111 let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
1112 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1113 let rendered_verification = super::render_verification_context(&verification_contracts);
1114 let stage_session_id = resolve_node_session_id(node);
1115 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1116 return Err(VmError::Runtime(format!(
1117 "workflow stage {node_id} requires an existing session \
1118 (call agent_session_open and feed session_id through model_policy \
1119 before entering this stage)"
1120 )));
1121 }
1122 if let Some(min_inputs) = node.input_contract.min_inputs {
1123 if selected.len() < min_inputs {
1124 return Err(VmError::Runtime(format!(
1125 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1126 )));
1127 }
1128 }
1129 if let Some(max_inputs) = node.input_contract.max_inputs {
1130 if selected.len() > max_inputs {
1131 return Err(VmError::Runtime(format!(
1132 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1133 )));
1134 }
1135 }
1136 let prompt = super::render_workflow_prompt(
1137 task,
1138 node.task_label.as_deref(),
1139 &rendered_verification,
1140 &rendered_context,
1141 );
1142
1143 let tool_format = node
1151 .model_policy
1152 .tool_format
1153 .clone()
1154 .filter(|value| !value.trim().is_empty())
1155 .or_else(|| {
1156 std::env::var("HARN_AGENT_TOOL_FORMAT")
1157 .ok()
1158 .filter(|value| !value.trim().is_empty())
1159 })
1160 .unwrap_or_else(|| {
1161 let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1162 let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1163 crate::llm_config::default_tool_format(&model, &provider)
1164 });
1165 let mut llm_result = if node.kind == "verify" {
1166 if let Some(command) = node
1167 .verify
1168 .as_ref()
1169 .and_then(|verify| verify.as_object())
1170 .and_then(|verify| verify.get("command"))
1171 .and_then(|value| value.as_str())
1172 .map(str::trim)
1173 .filter(|value| !value.is_empty())
1174 {
1175 let mut process = if cfg!(target_os = "windows") {
1176 let mut cmd = tokio::process::Command::new("cmd");
1177 cmd.arg("/C").arg(command);
1178 cmd
1179 } else {
1180 let mut cmd = tokio::process::Command::new("/bin/sh");
1181 cmd.arg("-lc").arg(command);
1182 cmd
1183 };
1184 process.stdin(std::process::Stdio::null());
1185 if let Some(context) = crate::stdlib::process::current_execution_context() {
1186 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1187 process.current_dir(cwd);
1188 }
1189 if !context.env.is_empty() {
1190 process.envs(context.env);
1191 }
1192 }
1193 let output = process
1194 .output()
1195 .await
1196 .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
1197 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1198 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1199 let combined = if stderr.is_empty() {
1200 stdout.clone()
1201 } else if stdout.is_empty() {
1202 stderr.clone()
1203 } else {
1204 format!("{stdout}\n{stderr}")
1205 };
1206 serde_json::json!({
1207 "status": "completed",
1208 "text": combined,
1209 "visible_text": combined,
1210 "command": command,
1211 "stdout": stdout,
1212 "stderr": stderr,
1213 "exit_status": output.status.code().unwrap_or(-1),
1214 "success": output.status.success(),
1215 })
1216 } else {
1217 serde_json::json!({
1218 "status": "completed",
1219 "text": "",
1220 "visible_text": "",
1221 })
1222 }
1223 } else {
1224 let mut options = BTreeMap::new();
1225 if let Some(provider) = &node.model_policy.provider {
1226 options.insert(
1227 "provider".to_string(),
1228 VmValue::String(Rc::from(provider.clone())),
1229 );
1230 }
1231 if let Some(model) = &node.model_policy.model {
1232 options.insert(
1233 "model".to_string(),
1234 VmValue::String(Rc::from(model.clone())),
1235 );
1236 }
1237 if let Some(model_tier) = &node.model_policy.model_tier {
1238 options.insert(
1239 "model_tier".to_string(),
1240 VmValue::String(Rc::from(model_tier.clone())),
1241 );
1242 }
1243 if let Some(temperature) = node.model_policy.temperature {
1244 options.insert("temperature".to_string(), VmValue::Float(temperature));
1245 }
1246 if let Some(max_tokens) = node.model_policy.max_tokens {
1247 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1248 }
1249 let tool_names = workflow_tool_names(&node.tools);
1250 let tools_value = node.raw_tools.clone().or_else(|| {
1251 if matches!(node.tools, serde_json::Value::Null) {
1252 None
1253 } else {
1254 Some(crate::stdlib::json_to_vm_value(&node.tools))
1255 }
1256 });
1257 if tools_value.is_some() && !tool_names.is_empty() {
1258 options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1259 }
1260 options.insert(
1261 "session_id".to_string(),
1262 VmValue::String(Rc::from(stage_session_id.clone())),
1263 );
1264
1265 let args = vec![
1266 VmValue::String(Rc::from(prompt.clone())),
1267 node.system
1268 .clone()
1269 .map(|s| VmValue::String(Rc::from(s)))
1270 .unwrap_or(VmValue::Nil),
1271 VmValue::Dict(Rc::new(options)),
1272 ];
1273 let mut opts = extract_llm_options(&args)?;
1274
1275 if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1276 let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1277 let effective_policy = tool_policy
1278 .intersect(&node.capability_policy)
1279 .map_err(VmError::Runtime)?;
1280 let auto_compact = if node.auto_compact.enabled {
1281 let mut ac = crate::orchestration::AutoCompactConfig::default();
1282 if let Some(v) = node.auto_compact.token_threshold {
1283 ac.token_threshold = v;
1284 }
1285 if let Some(v) = node.auto_compact.tool_output_max_chars {
1286 ac.tool_output_max_chars = v;
1287 }
1288 if let Some(ref strategy) = node.auto_compact.compact_strategy {
1289 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1290 ac.compact_strategy = s;
1291 }
1292 }
1293 if let Some(v) = node.auto_compact.hard_limit_tokens {
1294 ac.hard_limit_tokens = Some(v);
1295 }
1296 if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1297 if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1298 ac.hard_limit_strategy = s;
1299 }
1300 }
1301 if let Some(ref raw_ac) = node.raw_auto_compact {
1304 if let Some(dict) = raw_ac.as_dict() {
1305 if let Some(cb) = dict.get("compress_callback") {
1306 ac.compress_callback = Some(cb.clone());
1307 }
1308 if let Some(cb) = dict.get("mask_callback") {
1309 ac.mask_callback = Some(cb.clone());
1310 }
1311 if let Some(cb) = dict.get("custom_compactor") {
1312 ac.custom_compactor = Some(cb.clone());
1313 }
1314 }
1315 }
1316 {
1317 let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1318 let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1319 crate::llm::api::adapt_auto_compact_to_provider(
1320 &mut ac,
1321 user_specified_threshold,
1322 user_specified_hard_limit,
1323 &opts.provider,
1324 &opts.model,
1325 &opts.api_key,
1326 )
1327 .await;
1328 }
1329 Some(ac)
1330 } else {
1331 None
1332 };
1333 crate::llm::run_agent_loop_internal(
1334 &mut opts,
1335 crate::llm::AgentLoopConfig {
1336 persistent: true,
1337 max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1338 max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1339 nudge: node.model_policy.nudge.clone(),
1340 done_sentinel: node.done_sentinel.clone(),
1341 break_unless_phase: None,
1342 tool_retries: 0,
1343 tool_backoff_ms: 1000,
1344 tool_format: tool_format.clone(),
1345 auto_compact,
1346 policy: Some(effective_policy),
1347 approval_policy: Some(node.approval_policy.clone()),
1348 daemon: false,
1349 daemon_config: Default::default(),
1350 llm_retries: 2,
1351 llm_backoff_ms: 2000,
1352 token_budget: None,
1353 exit_when_verified: node.exit_when_verified,
1354 loop_detect_warn: 2,
1355 loop_detect_block: 3,
1356 loop_detect_skip: 4,
1357 tool_examples: node.model_policy.tool_examples.clone(),
1358 turn_policy: node.model_policy.turn_policy.clone(),
1359 stop_after_successful_tools: node
1360 .model_policy
1361 .stop_after_successful_tools
1362 .clone(),
1363 require_successful_tools: node.model_policy.require_successful_tools.clone(),
1364 session_id: stage_session_id.clone(),
1368 event_sink: None,
1369 task_ledger: node
1373 .raw_model_policy
1374 .as_ref()
1375 .and_then(|v| v.as_dict())
1376 .and_then(|d| d.get("task_ledger"))
1377 .map(crate::llm::helpers::vm_value_to_json)
1378 .and_then(|json| serde_json::from_value(json).ok())
1379 .unwrap_or_default(),
1380 post_turn_callback: node
1381 .raw_model_policy
1382 .as_ref()
1383 .and_then(|v| v.as_dict())
1384 .and_then(|d| d.get("post_turn_callback"))
1385 .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1386 .cloned(),
1387 skill_registry: resolve_stage_skill_registry(node),
1394 skill_match: resolve_stage_skill_match(node),
1395 working_files: Vec::new(),
1396 },
1397 )
1398 .await?
1399 } else {
1400 let result = vm_call_llm_full(&opts).await?;
1401 crate::llm::agent_loop_result_from_llm(&result, opts)
1402 }
1403 };
1404 if let Some(payload) = llm_result.as_object_mut() {
1405 payload.insert("prompt".to_string(), serde_json::json!(prompt));
1406 payload.insert(
1407 "system_prompt".to_string(),
1408 serde_json::json!(node.system.clone().unwrap_or_default()),
1409 );
1410 payload.insert(
1411 "rendered_context".to_string(),
1412 serde_json::json!(rendered_context),
1413 );
1414 if !verification_contracts.is_empty() {
1415 payload.insert(
1416 "verification_contracts".to_string(),
1417 serde_json::to_value(&verification_contracts).unwrap_or_default(),
1418 );
1419 payload.insert(
1420 "rendered_verification_context".to_string(),
1421 serde_json::json!(rendered_verification),
1422 );
1423 }
1424 payload.insert(
1425 "selected_artifact_ids".to_string(),
1426 serde_json::json!(selected
1427 .iter()
1428 .map(|artifact| artifact.id.clone())
1429 .collect::<Vec<_>>()),
1430 );
1431 payload.insert(
1432 "selected_artifact_titles".to_string(),
1433 serde_json::json!(selected
1434 .iter()
1435 .map(|artifact| artifact.title.clone())
1436 .collect::<Vec<_>>()),
1437 );
1438 payload.insert(
1439 "tool_calling_mode".to_string(),
1440 serde_json::json!(tool_format.clone()),
1441 );
1442 }
1443
1444 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1445 let result_transcript = llm_result
1449 .get("transcript")
1450 .cloned()
1451 .map(|value| crate::stdlib::json_to_vm_value(&value));
1452 let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1453 let transcript = result_transcript
1454 .or(session_transcript)
1455 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1456 let output_kind = node
1457 .output_contract
1458 .output_kinds
1459 .first()
1460 .cloned()
1461 .unwrap_or_else(|| {
1462 if node.kind == "verify" {
1463 "verification_result".to_string()
1464 } else {
1465 "artifact".to_string()
1466 }
1467 });
1468 let mut metadata = BTreeMap::new();
1469 metadata.insert(
1470 "input_artifact_ids".to_string(),
1471 serde_json::json!(selected
1472 .iter()
1473 .map(|artifact| artifact.id.clone())
1474 .collect::<Vec<_>>()),
1475 );
1476 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1477 let artifact = ArtifactRecord {
1478 type_name: "artifact".to_string(),
1479 id: new_id("artifact"),
1480 kind: output_kind,
1481 title: Some(format!("stage {node_id} output")),
1482 text: Some(visible_text),
1483 data: Some(llm_result.clone()),
1484 source: Some(node_id.to_string()),
1485 created_at: now_rfc3339(),
1486 freshness: Some("fresh".to_string()),
1487 priority: None,
1488 lineage: selected
1489 .iter()
1490 .map(|artifact| artifact.id.clone())
1491 .collect(),
1492 relevance: Some(1.0),
1493 estimated_tokens: None,
1494 stage: Some(node_id.to_string()),
1495 metadata,
1496 }
1497 .normalize();
1498
1499 Ok((llm_result, vec![artifact], transcript))
1500}
1501
1502pub fn next_nodes_for(
1503 graph: &WorkflowGraph,
1504 current: &str,
1505 branch: Option<&str>,
1506) -> Vec<WorkflowEdge> {
1507 let mut matching: Vec<WorkflowEdge> = graph
1508 .edges
1509 .iter()
1510 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1511 .cloned()
1512 .collect();
1513 if matching.is_empty() {
1514 matching = graph
1515 .edges
1516 .iter()
1517 .filter(|edge| edge.from == current && edge.branch.is_none())
1518 .cloned()
1519 .collect();
1520 }
1521 matching
1522}
1523
1524pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1525 next_nodes_for(graph, current, Some(branch))
1526 .into_iter()
1527 .next()
1528 .map(|edge| edge.to)
1529}
1530
1531pub fn append_audit_entry(
1532 graph: &mut WorkflowGraph,
1533 op: &str,
1534 node_id: Option<String>,
1535 reason: Option<String>,
1536 metadata: BTreeMap<String, serde_json::Value>,
1537) {
1538 graph.audit_log.push(WorkflowAuditEntry {
1539 id: new_id("audit"),
1540 op: op.to_string(),
1541 node_id,
1542 timestamp: now_rfc3339(),
1543 reason,
1544 metadata,
1545 });
1546}