1use crate::value::VmDictExt;
4use std::collections::{BTreeMap, BTreeSet};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10 BranchSemantics, CapabilityPolicy, ContextPolicy, EqIgnored, EscalationPolicy, JoinPolicy,
11 MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_surface::{tool_capability_policy_from_spec, tool_names_from_spec};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23 pub id: Option<String>,
24 pub kind: String,
25 pub mode: Option<String>,
26 pub prompt: Option<String>,
27 pub system: Option<String>,
28 pub task_label: Option<String>,
29 pub done_sentinel: Option<String>,
30 pub tools: serde_json::Value,
31 pub model_policy: ModelPolicy,
32 pub auto_compact: AutoCompactPolicy,
37 #[serde(default)]
42 pub output_visibility: Option<String>,
43 pub context_policy: ContextPolicy,
44 pub retry_policy: RetryPolicy,
45 pub capability_policy: CapabilityPolicy,
46 pub approval_policy: super::ToolApprovalPolicy,
47 pub input_contract: StageContract,
48 pub output_contract: StageContract,
49 pub branch_semantics: BranchSemantics,
50 pub map_policy: MapPolicy,
51 pub join_policy: JoinPolicy,
52 pub reduce_policy: ReducePolicy,
53 pub escalation_policy: EscalationPolicy,
54 pub verify: Option<serde_json::Value>,
55 #[serde(default)]
60 pub exit_when_verified: bool,
61 pub metadata: BTreeMap<String, serde_json::Value>,
62 #[serde(skip)]
63 pub raw_tools: Option<VmValue>,
64 #[serde(skip)]
68 pub raw_auto_compact: Option<VmValue>,
69 #[serde(skip)]
72 pub raw_model_policy: Option<VmValue>,
73 #[serde(skip)]
78 pub raw_context_assembler: Option<VmValue>,
79 #[serde(skip)]
86 pub raw_verify: Option<VmValue>,
87}
88
89impl PartialEq for WorkflowNode {
90 fn eq(&self, other: &Self) -> bool {
91 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
92 }
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationRequirement {
98 pub kind: String,
99 pub value: String,
100 pub note: Option<String>,
101}
102
103#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(default)]
105pub struct VerificationContract {
106 pub source_node: Option<String>,
107 pub summary: Option<String>,
108 pub command: Option<String>,
109 pub expect_status: Option<i64>,
110 pub assert_text: Option<String>,
111 pub expect_text: Option<String>,
112 pub required_identifiers: Vec<String>,
113 pub required_paths: Vec<String>,
114 pub required_text: Vec<String>,
115 pub notes: Vec<String>,
116 pub checks: Vec<VerificationRequirement>,
117}
118
119impl VerificationContract {
120 fn is_empty(&self) -> bool {
121 self.summary.is_none()
122 && self.command.is_none()
123 && self.expect_status.is_none()
124 && self.assert_text.is_none()
125 && self.expect_text.is_none()
126 && self.required_identifiers.is_empty()
127 && self.required_paths.is_empty()
128 && self.required_text.is_empty()
129 && self.notes.is_empty()
130 && self.checks.is_empty()
131 }
132}
133
134fn push_unique_string(values: &mut Vec<String>, value: &str) {
135 let trimmed = value.trim();
136 if trimmed.is_empty() {
137 return;
138 }
139 if !values.iter().any(|existing| existing == trimmed) {
140 values.push(trimmed.to_string());
141 }
142}
143
144fn push_unique_requirement(
145 values: &mut Vec<VerificationRequirement>,
146 kind: &str,
147 value: &str,
148 note: Option<&str>,
149) {
150 let trimmed_kind = kind.trim();
151 let trimmed_value = value.trim();
152 let trimmed_note = note
153 .map(str::trim)
154 .filter(|candidate| !candidate.is_empty())
155 .map(|candidate| candidate.to_string());
156 if trimmed_kind.is_empty() || trimmed_value.is_empty() {
157 return;
158 }
159 let candidate = VerificationRequirement {
160 kind: trimmed_kind.to_string(),
161 value: trimmed_value.to_string(),
162 note: trimmed_note,
163 };
164 if !values.iter().any(|existing| existing == &candidate) {
165 values.push(candidate);
166 }
167}
168
169fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
170 match value {
171 Some(serde_json::Value::String(text)) => {
172 let mut values = Vec::new();
173 push_unique_string(&mut values, text);
174 values
175 }
176 Some(serde_json::Value::Array(items)) => {
177 let mut values = Vec::new();
178 for item in items {
179 if let Some(text) = item.as_str() {
180 push_unique_string(&mut values, text);
181 }
182 }
183 values
184 }
185 _ => Vec::new(),
186 }
187}
188
189fn merge_verification_requirement_list(
190 target: &mut Vec<VerificationRequirement>,
191 value: Option<&serde_json::Value>,
192) {
193 let Some(items) = value.and_then(|raw| raw.as_array()) else {
194 return;
195 };
196 for item in items {
197 let Some(object) = item.as_object() else {
198 continue;
199 };
200 let kind = object
201 .get("kind")
202 .and_then(|value| value.as_str())
203 .unwrap_or_default();
204 let value = object
205 .get("value")
206 .and_then(|value| value.as_str())
207 .unwrap_or_default();
208 let note = object
209 .get("note")
210 .or_else(|| object.get("description"))
211 .or_else(|| object.get("reason"))
212 .and_then(|value| value.as_str());
213 push_unique_requirement(target, kind, value, note);
214 }
215}
216
217fn merge_verification_contract_fields(
218 target: &mut VerificationContract,
219 object: &serde_json::Map<String, serde_json::Value>,
220) {
221 if target.summary.is_none() {
222 target.summary = object
223 .get("summary")
224 .and_then(|value| value.as_str())
225 .map(str::trim)
226 .filter(|value| !value.is_empty())
227 .map(|value| value.to_string());
228 }
229 if target.command.is_none() {
230 target.command = object
231 .get("command")
232 .and_then(|value| value.as_str())
233 .map(str::trim)
234 .filter(|value| !value.is_empty())
235 .map(|value| value.to_string());
236 }
237 if target.expect_status.is_none() {
238 target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
239 }
240 if target.assert_text.is_none() {
241 target.assert_text = object
242 .get("assert_text")
243 .and_then(|value| value.as_str())
244 .map(str::trim)
245 .filter(|value| !value.is_empty())
246 .map(|value| value.to_string());
247 }
248 if target.expect_text.is_none() {
249 target.expect_text = object
250 .get("expect_text")
251 .and_then(|value| value.as_str())
252 .map(str::trim)
253 .filter(|value| !value.is_empty())
254 .map(|value| value.to_string());
255 }
256
257 for value in json_string_list(
258 object
259 .get("required_identifiers")
260 .or_else(|| object.get("identifiers")),
261 ) {
262 push_unique_string(&mut target.required_identifiers, &value);
263 }
264 for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
265 push_unique_string(&mut target.required_paths, &value);
266 }
267 for value in json_string_list(
268 object
269 .get("required_text")
270 .or_else(|| object.get("exact_text"))
271 .or_else(|| object.get("required_strings")),
272 ) {
273 push_unique_string(&mut target.required_text, &value);
274 }
275 for value in json_string_list(object.get("notes")) {
276 push_unique_string(&mut target.notes, &value);
277 }
278 merge_verification_requirement_list(&mut target.checks, object.get("checks"));
279}
280
281fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
282 let resolved = crate::stdlib::process::resolve_source_asset_path(path);
283 let contents = std::fs::read_to_string(&resolved).map_err(|error| {
284 VmError::Runtime(format!(
285 "workflow verification contract read failed for {}: {error}",
286 resolved.display()
287 ))
288 })?;
289 serde_json::from_str(&contents).map_err(|error| {
290 VmError::Runtime(format!(
291 "workflow verification contract parse failed for {}: {error}",
292 resolved.display()
293 ))
294 })
295}
296
297fn resolve_verification_contract_path(
298 verify: &serde_json::Map<String, serde_json::Value>,
299) -> Result<Option<serde_json::Value>, VmError> {
300 let Some(path) = verify
301 .get("contract_path")
302 .or_else(|| verify.get("verification_contract_path"))
303 .and_then(|value| value.as_str())
304 .map(str::trim)
305 .filter(|value| !value.is_empty())
306 else {
307 return Ok(None);
308 };
309 Ok(Some(load_verification_contract_file(path)?))
310}
311
312pub fn verification_contract_from_verify(
313 node_id: &str,
314 verify: Option<&serde_json::Value>,
315) -> Result<Option<VerificationContract>, VmError> {
316 let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
317 return Ok(None);
318 };
319
320 let mut contract = VerificationContract {
321 source_node: Some(node_id.to_string()),
322 ..Default::default()
323 };
324
325 if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
326 let Some(object) = file_contract.as_object() else {
327 return Err(VmError::Runtime(
328 "workflow verification contract file must parse to a JSON object".to_string(),
329 ));
330 };
331 merge_verification_contract_fields(&mut contract, object);
332 }
333
334 if let Some(inline_contract) = verify_object.get("contract") {
335 let Some(object) = inline_contract.as_object() else {
336 return Err(VmError::Runtime(
337 "workflow verify.contract must be an object".to_string(),
338 ));
339 };
340 merge_verification_contract_fields(&mut contract, object);
341 }
342
343 merge_verification_contract_fields(&mut contract, verify_object);
344
345 if let Some(assert_text) = contract.assert_text.clone() {
346 push_unique_requirement(
347 &mut contract.checks,
348 "visible_text_contains",
349 &assert_text,
350 Some("verify stage requires visible output to contain this text"),
351 );
352 }
353 if let Some(expect_text) = contract.expect_text.clone() {
354 push_unique_requirement(
355 &mut contract.checks,
356 "combined_output_contains",
357 &expect_text,
358 Some("verify command requires combined stdout/stderr to contain this text"),
359 );
360 }
361 if let Some(expect_status) = contract.expect_status {
362 push_unique_requirement(
363 &mut contract.checks,
364 "expect_status",
365 &expect_status.to_string(),
366 Some("verify command exit status must match exactly"),
367 );
368 }
369 for identifier in contract.required_identifiers.clone() {
370 push_unique_requirement(
371 &mut contract.checks,
372 "identifier",
373 &identifier,
374 Some("use this exact identifier spelling"),
375 );
376 }
377 for path in contract.required_paths.clone() {
378 push_unique_requirement(
379 &mut contract.checks,
380 "path",
381 &path,
382 Some("preserve this exact path"),
383 );
384 }
385 for text in contract.required_text.clone() {
386 push_unique_requirement(
387 &mut contract.checks,
388 "text",
389 &text,
390 Some("required exact text or wiring snippet"),
391 );
392 }
393
394 if contract.is_empty() {
395 return Ok(None);
396 }
397 Ok(Some(contract))
398}
399
400fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
401 if !values.iter().any(|existing| existing == &candidate) {
402 values.push(candidate);
403 }
404}
405
406pub fn workflow_verification_contracts(
407 graph: &WorkflowGraph,
408) -> Result<Vec<VerificationContract>, VmError> {
409 let mut contracts = Vec::new();
410 for (node_id, node) in &graph.nodes {
411 if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
412 push_unique_contract(&mut contracts, contract);
413 }
414 }
415 Ok(contracts)
416}
417
418pub fn inject_workflow_verification_contracts(
419 node: &mut WorkflowNode,
420 contracts: &[VerificationContract],
421) {
422 if contracts.is_empty() {
423 return;
424 }
425 node.metadata.insert(
426 WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
427 serde_json::to_value(contracts).unwrap_or_default(),
428 );
429}
430
431pub fn stage_verification_contracts(
432 node_id: &str,
433 node: &WorkflowNode,
434) -> Result<Vec<VerificationContract>, VmError> {
435 let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
436 let local_only = matches!(
437 node.metadata
438 .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
439 .and_then(|value| value.as_str()),
440 Some("local_only")
441 );
442 if local_only {
443 return Ok(local_contract.into_iter().collect());
444 }
445
446 let mut contracts = node
447 .metadata
448 .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
449 .cloned()
450 .map(|value| {
451 serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
452 VmError::Runtime(format!(
453 "workflow stage {node_id} verification contract metadata parse failed: {error}"
454 ))
455 })
456 })
457 .transpose()?
458 .unwrap_or_default();
459
460 if let Some(local_contract) = local_contract {
461 push_unique_contract(&mut contracts, local_contract);
462 }
463 Ok(contracts)
464}
465
466#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
467#[serde(default)]
468pub struct WorkflowEdge {
469 pub from: String,
470 pub to: String,
471 pub branch: Option<String>,
472 pub label: Option<String>,
473}
474
475#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
476#[serde(default)]
477pub struct WorkflowGraph {
478 #[serde(rename = "_type")]
479 pub type_name: String,
480 pub id: String,
481 pub name: Option<String>,
482 pub version: usize,
483 pub entry: String,
484 pub nodes: BTreeMap<String, WorkflowNode>,
485 pub edges: Vec<WorkflowEdge>,
486 pub capability_policy: CapabilityPolicy,
487 pub approval_policy: super::ToolApprovalPolicy,
488 pub metadata: BTreeMap<String, serde_json::Value>,
489 pub audit_log: Vec<WorkflowAuditEntry>,
490}
491
492#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
493#[serde(default)]
494pub struct WorkflowAuditEntry {
495 pub id: String,
496 pub op: String,
497 pub node_id: Option<String>,
498 pub timestamp: String,
499 pub reason: Option<String>,
500 pub metadata: BTreeMap<String, serde_json::Value>,
501}
502
503#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
504#[serde(default)]
505pub struct WorkflowValidationReport {
506 pub valid: bool,
507 pub errors: Vec<String>,
508 pub warnings: Vec<String>,
509 pub reachable_nodes: Vec<String>,
510}
511
512fn retry_repair_prompt_builder_from_dict(
518 dict: Option<&crate::value::DictMap>,
519) -> Option<EqIgnored<VmValue>> {
520 dict.and_then(|d| d.get("retry_policy"))
521 .and_then(|policy| policy.as_dict())
522 .and_then(|policy| policy.get("repair_prompt_builder"))
523 .filter(|value| !matches!(value, VmValue::Nil))
524 .cloned()
525 .map(EqIgnored)
526}
527
528pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
529 let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
530 let dict = value.as_dict();
531 node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
532 node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
533 node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
534 node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
535 node.raw_verify = dict
538 .and_then(|d| d.get("verify"))
539 .filter(|value| {
540 matches!(
541 value,
542 VmValue::Closure(_) | VmValue::BuiltinRef(_) | VmValue::BuiltinRefId(_)
543 )
544 })
545 .cloned();
546 node.retry_policy.repair_prompt_builder = retry_repair_prompt_builder_from_dict(dict);
547 Ok(node)
548}
549
550pub fn parse_workflow_node_json(
551 json: serde_json::Value,
552 label: &str,
553) -> Result<WorkflowNode, VmError> {
554 super::parse_json_payload(json, label)
555}
556
557pub fn parse_workflow_edge_json(
558 json: serde_json::Value,
559 label: &str,
560) -> Result<WorkflowEdge, VmError> {
561 super::parse_json_payload(json, label)
562}
563
564pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
565 let mut graph: WorkflowGraph = super::parse_json_value(value)?;
566 let as_dict = value.as_dict().cloned().unwrap_or_default();
567
568 if graph.nodes.is_empty() {
569 for key in ["act", "verify", "repair"] {
570 if let Some(node_value) = as_dict.get(key) {
571 let mut node = parse_workflow_node_value(node_value, "orchestration")?;
572 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
573 node.id = Some(key.to_string());
574 if node.kind.is_empty() {
575 node.kind = if key == "verify" {
576 "verify".to_string()
577 } else {
578 "stage".to_string()
579 };
580 }
581 if node.model_policy.provider.is_none() {
582 node.model_policy.provider = as_dict
583 .get("provider")
584 .map(|value| value.display())
585 .filter(|value| !value.is_empty());
586 }
587 if node.model_policy.model.is_none() {
588 node.model_policy.model = as_dict
589 .get("model")
590 .map(|value| value.display())
591 .filter(|value| !value.is_empty());
592 }
593 if node.model_policy.model_tier.is_none() {
594 node.model_policy.model_tier = as_dict
595 .get("model_tier")
596 .or_else(|| as_dict.get("tier"))
597 .map(|value| value.display())
598 .filter(|value| !value.is_empty());
599 }
600 if node.model_policy.temperature.is_none() {
601 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
602 if let VmValue::Float(number) = value {
603 Some(*number)
604 } else {
605 value.as_int().map(|number| number as f64)
606 }
607 });
608 }
609 if node.model_policy.max_tokens.is_none() {
610 node.model_policy.max_tokens =
611 as_dict.get("max_tokens").and_then(|value| value.as_int());
612 }
613 if node.mode.is_none() {
614 node.mode = as_dict
615 .get("mode")
616 .map(|value| value.display())
617 .filter(|value| !value.is_empty());
618 }
619 if node.done_sentinel.is_none() {
620 node.done_sentinel = as_dict
621 .get("done_sentinel")
622 .map(|value| value.display())
623 .filter(|value| !value.is_empty());
624 }
625 if key == "verify"
626 && node.verify.is_none()
627 && (raw_node.contains_key("assert_text")
628 || raw_node.contains_key("command")
629 || raw_node.contains_key("expect_status")
630 || raw_node.contains_key("expect_text"))
631 {
632 node.verify = Some(serde_json::json!({
633 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
634 "command": raw_node.get("command").map(vm_value_to_json),
635 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
636 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
637 }));
638 }
639 graph.nodes.insert(key.to_string(), node);
640 }
641 }
642 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
643 graph.entry = "act".to_string();
644 }
645 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
646 if graph.nodes.contains_key("verify") {
647 graph.edges.push(WorkflowEdge {
648 from: "act".to_string(),
649 to: "verify".to_string(),
650 branch: None,
651 label: None,
652 });
653 }
654 if graph.nodes.contains_key("repair") {
655 graph.edges.push(WorkflowEdge {
656 from: "verify".to_string(),
657 to: "repair".to_string(),
658 branch: Some("failed".to_string()),
659 label: None,
660 });
661 graph.edges.push(WorkflowEdge {
662 from: "repair".to_string(),
663 to: "verify".to_string(),
664 branch: Some("retry".to_string()),
665 label: None,
666 });
667 }
668 }
669 }
670
671 if graph.type_name.is_empty() {
672 graph.type_name = "workflow_graph".to_string();
673 }
674 if graph.id.is_empty() {
675 graph.id = new_id("workflow");
676 }
677 if graph.version == 0 {
678 graph.version = 1;
679 }
680 if graph.entry.is_empty() {
681 graph.entry = graph
682 .nodes
683 .keys()
684 .next()
685 .cloned()
686 .unwrap_or_else(|| "act".to_string());
687 }
688 for (node_id, node) in &mut graph.nodes {
689 let raw_node = as_dict
690 .get("nodes")
691 .and_then(|nodes| nodes.as_dict())
692 .and_then(|nodes| nodes.get(node_id.as_str()))
693 .and_then(|node_value| node_value.as_dict());
694 if node.raw_tools.is_none() {
695 node.raw_tools = raw_node.and_then(|raw_node| raw_node.get("tools")).cloned();
696 }
697 if node.raw_verify.is_none() {
698 node.raw_verify = raw_node
701 .and_then(|raw_node| raw_node.get("verify"))
702 .filter(|value| {
703 matches!(
704 value,
705 VmValue::Closure(_) | VmValue::BuiltinRef(_) | VmValue::BuiltinRefId(_)
706 )
707 })
708 .cloned();
709 }
710 if node.retry_policy.repair_prompt_builder.is_none() {
711 node.retry_policy.repair_prompt_builder =
712 retry_repair_prompt_builder_from_dict(raw_node);
713 }
714 if node.id.is_none() {
715 node.id = Some(node_id.clone());
716 }
717 if node.kind.is_empty() {
718 node.kind = "stage".to_string();
719 }
720 if node.join_policy.strategy.is_empty() {
721 node.join_policy.strategy = "all".to_string();
722 }
723 if node.reduce_policy.strategy.is_empty() {
724 node.reduce_policy.strategy = "concat".to_string();
725 }
726 if node.output_contract.output_kinds.is_empty() {
727 node.output_contract.output_kinds = vec![match node.kind.as_str() {
728 "verify" => "verification_result".to_string(),
729 "reduce" => node
730 .reduce_policy
731 .output_kind
732 .clone()
733 .unwrap_or_else(|| "summary".to_string()),
734 "map" => node
735 .map_policy
736 .output_kind
737 .clone()
738 .unwrap_or_else(|| "artifact".to_string()),
739 "escalation" => "plan".to_string(),
740 _ => "artifact".to_string(),
741 }];
742 }
743 if node.retry_policy.max_attempts == 0 {
744 node.retry_policy.max_attempts = 1;
745 }
746 }
747 Ok(graph)
748}
749
750pub fn validate_workflow(
751 graph: &WorkflowGraph,
752 ceiling: Option<&CapabilityPolicy>,
753) -> WorkflowValidationReport {
754 let mut errors = Vec::new();
755 let mut warnings = Vec::new();
756
757 if !graph.nodes.contains_key(&graph.entry) {
758 errors.push(format!("entry node does not exist: {}", graph.entry));
759 }
760
761 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
762 for edge in &graph.edges {
763 if !node_ids.contains(&edge.from) {
764 errors.push(format!("edge.from references unknown node: {}", edge.from));
765 }
766 if !node_ids.contains(&edge.to) {
767 errors.push(format!("edge.to references unknown node: {}", edge.to));
768 }
769 }
770
771 let reachable_nodes = reachable_nodes(graph);
772 for node_id in &node_ids {
773 if !reachable_nodes.contains(node_id) {
774 warnings.push(format!("node is unreachable: {node_id}"));
775 }
776 }
777
778 for (node_id, node) in &graph.nodes {
779 let incoming = graph
780 .edges
781 .iter()
782 .filter(|edge| edge.to == *node_id)
783 .count();
784 let outgoing: Vec<&WorkflowEdge> = graph
785 .edges
786 .iter()
787 .filter(|edge| edge.from == *node_id)
788 .collect();
789 if let Some(min_inputs) = node.input_contract.min_inputs {
790 if let Some(max_inputs) = node.input_contract.max_inputs {
791 if min_inputs > max_inputs {
792 errors.push(format!(
793 "node {node_id}: input contract min_inputs exceeds max_inputs"
794 ));
795 }
796 }
797 }
798 match node.kind.as_str() {
799 "condition" => {
800 let has_true = outgoing
801 .iter()
802 .any(|edge| edge.branch.as_deref() == Some("true"));
803 let has_false = outgoing
804 .iter()
805 .any(|edge| edge.branch.as_deref() == Some("false"));
806 if !has_true || !has_false {
807 errors.push(format!(
808 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
809 ));
810 }
811 }
812 "fork" if outgoing.len() < 2 => {
813 errors.push(format!(
814 "node {node_id}: fork nodes require at least two outgoing edges"
815 ));
816 }
817 "join" if incoming < 2 => {
818 warnings.push(format!(
819 "node {node_id}: join node has fewer than two incoming edges"
820 ));
821 }
822 "map"
823 if node.map_policy.items.is_empty()
824 && node.map_policy.item_artifact_kind.is_none()
825 && node.input_contract.input_kinds.is_empty() =>
826 {
827 errors.push(format!(
828 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
829 ));
830 }
831 "reduce" if node.input_contract.input_kinds.is_empty() => {
832 warnings.push(format!(
833 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
834 ));
835 }
836 _ => {}
837 }
838 }
839
840 if let Some(ceiling) = ceiling {
841 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
842 errors.push(error);
843 }
844 for (node_id, node) in &graph.nodes {
845 if let Err(error) = ceiling.intersect(&node.capability_policy) {
846 errors.push(format!("node {node_id}: {error}"));
847 }
848 }
849 }
850
851 for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
852 let message = format!("{}: {}", diagnostic.code, diagnostic.message);
853 match diagnostic.severity {
854 crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
855 crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
856 }
857 }
858
859 WorkflowValidationReport {
860 valid: errors.is_empty(),
861 errors,
862 warnings,
863 reachable_nodes: reachable_nodes.into_iter().collect(),
864 }
865}
866
867fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
868 let mut seen = BTreeSet::new();
869 let mut stack = vec![graph.entry.clone()];
870 while let Some(node_id) = stack.pop() {
871 if !seen.insert(node_id.clone()) {
872 continue;
873 }
874 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
875 stack.push(edge.to.clone());
876 }
877 }
878 seen
879}
880
881fn resolve_node_session_id(node: &WorkflowNode) -> String {
887 if let Some(explicit) = node
888 .raw_model_policy
889 .as_ref()
890 .and_then(|v| v.as_dict())
891 .and_then(|d| d.get("session_id"))
892 .and_then(|v| match v {
893 VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
894 _ => None,
895 })
896 {
897 return explicit;
898 }
899 if let Some(persisted) = node
900 .metadata
901 .get("worker_session_id")
902 .and_then(|value| value.as_str())
903 .filter(|value| !value.trim().is_empty())
904 {
905 return persisted.to_string();
906 }
907 format!("workflow_stage_{}", uuid::Uuid::now_v7())
908}
909
910fn raw_model_policy_dict(node: &WorkflowNode) -> Option<&crate::value::DictMap> {
911 node.raw_model_policy
912 .as_ref()
913 .and_then(|value| value.as_dict())
914}
915
916fn insert_json_vm_option<T: Serialize>(
917 options: &mut crate::value::DictMap,
918 key: &str,
919 value: &T,
920) -> Result<(), VmError> {
921 let json = serde_json::to_value(value).map_err(|error| {
922 VmError::Runtime(format!("workflow stage option encode error: {error}"))
923 })?;
924 options.insert(
925 crate::value::intern_key(key),
926 crate::stdlib::json_to_vm_value(&json),
927 );
928 Ok(())
929}
930
931fn merge_raw_model_policy_options(options: &mut crate::value::DictMap, node: &WorkflowNode) {
932 if let Some(raw) = raw_model_policy_dict(node) {
933 for (key, value) in raw {
934 if !matches!(value, VmValue::Nil) {
935 options.insert(key.clone(), value.clone());
936 }
937 }
938 }
939}
940
941fn stage_tools_value(node: &WorkflowNode) -> Option<VmValue> {
942 node.raw_tools.clone().or_else(|| {
943 if matches!(node.tools, serde_json::Value::Null) {
944 None
945 } else {
946 Some(crate::stdlib::json_to_vm_value(&node.tools))
947 }
948 })
949}
950
951fn add_stage_tools_option(
952 options: &mut crate::value::DictMap,
953 tools_value: &Option<VmValue>,
954 tool_names: &[String],
955) {
956 if !tool_names.is_empty() {
957 if let Some(value) = tools_value.clone() {
958 options.insert(crate::value::intern_key("tools"), value);
959 }
960 }
961}
962
963fn workflow_stage_llm_options(
964 node: &WorkflowNode,
965 stage_session_id: &str,
966 tools_value: &Option<VmValue>,
967 tool_names: &[String],
968 stage_agent_options: &super::WorkflowStageAgentOptions,
969) -> crate::value::DictMap {
970 let mut options = stage_agent_options.llm_options_vm_dict();
971 merge_raw_model_policy_options(&mut options, node);
972 options.put_str("session_id", stage_session_id);
973 options.put_str("tool_format", stage_agent_options.tool_format.clone());
974 add_stage_tools_option(&mut options, tools_value, tool_names);
975 options
976}
977
978async fn workflow_stage_agent_loop_options(
989 ctx: &crate::vm::AsyncBuiltinCtx,
990 node: &WorkflowNode,
991 stage_session_id: &str,
992 tools_value: &Option<VmValue>,
993 tool_names: &[String],
994 stage_agent_options: &super::WorkflowStageAgentOptions,
995) -> Result<crate::value::DictMap, VmError> {
996 let tool_policy = tool_capability_policy_from_spec(&node.tools);
999 let effective_policy = tool_policy
1000 .intersect(&node.capability_policy)
1001 .map_err(VmError::Runtime)?;
1002
1003 let stage_label = node
1004 .id
1005 .clone()
1006 .unwrap_or_else(|| stage_session_id.to_string());
1007
1008 let mut config = crate::value::DictMap::new();
1009 config.insert(
1010 crate::value::intern_key("base"),
1011 VmValue::dict(stage_agent_options.agent_loop_options_vm_dict()),
1012 );
1013 config.insert(
1014 crate::value::intern_key("raw_model_policy"),
1015 node.raw_model_policy.clone().unwrap_or(VmValue::Nil),
1016 );
1017 insert_json_vm_option(&mut config, "auto_compact", &node.auto_compact)?;
1018 config.insert(
1019 crate::value::intern_key("raw_auto_compact"),
1020 node.raw_auto_compact.clone().unwrap_or(VmValue::Nil),
1021 );
1022 config.insert(
1025 crate::value::intern_key("tools"),
1026 if tool_names.is_empty() {
1027 VmValue::Nil
1028 } else {
1029 tools_value.clone().unwrap_or(VmValue::Nil)
1030 },
1031 );
1032 if let Some(context) = crate::orchestration::current_workflow_skill_context() {
1033 if let Some(registry) = context.registry {
1034 config.insert(crate::value::intern_key("skills"), registry);
1035 }
1036 if let Some(match_config) = context.match_config {
1037 config.insert(crate::value::intern_key("skill_match"), match_config);
1038 }
1039 }
1040 insert_json_vm_option(&mut config, "policy", &effective_policy)?;
1041 insert_json_vm_option(&mut config, "approval_policy", &node.approval_policy)?;
1042 config.put_str("session_id", stage_session_id);
1043 config.put_str("tool_format", stage_agent_options.tool_format.clone());
1044 config.put_str(
1045 "nested_kind",
1046 crate::orchestration::NestedExecutionKind::WorkflowStage.as_str(),
1047 );
1048 config.put_str("nested_label", stage_label);
1049
1050 let flattened = crate::stdlib::harn_entry::call_harn_export_by_name(
1051 ctx,
1052 "std/workflow/stage",
1053 "workflow_flatten_agent_loop_options",
1054 "workflow_flatten_agent_loop_options",
1055 &[VmValue::dict(config)],
1056 )
1057 .await?;
1058 let VmValue::Dict(options) = flattened else {
1059 return Err(VmError::Runtime(
1060 "workflow_flatten_agent_loop_options must return a dict".to_string(),
1061 ));
1062 };
1063 let options = (*options).clone();
1064 enforce_flattened_ceiling(&options, &effective_policy)?;
1065 Ok(options)
1066}
1067
1068fn enforce_flattened_ceiling(
1075 options: &crate::value::DictMap,
1076 ceiling: &CapabilityPolicy,
1077) -> Result<(), VmError> {
1078 let Some(policy_value) = options.get("policy") else {
1079 return Err(VmError::Runtime(
1080 "flattened stage options are missing the capability policy".to_string(),
1081 ));
1082 };
1083 let requested: CapabilityPolicy = serde_json::from_value(vm_value_to_json(policy_value))
1084 .map_err(|error| {
1085 VmError::Runtime(format!(
1086 "flattened stage capability policy is malformed: {error}"
1087 ))
1088 })?;
1089 ceiling
1090 .assert_within_ceiling(&requested)
1091 .map_err(|message| VmError::CategorizedError {
1092 message,
1093 category: crate::value::ErrorCategory::ToolRejected,
1094 })
1095}
1096
1097#[derive(Clone, Debug)]
1098pub struct PreparedWorkflowStageNode {
1099 pub prompt: String,
1100 pub system: Option<String>,
1101 pub run_agent_loop: bool,
1102 pub llm_options: crate::value::DictMap,
1103 pub agent_loop_options: crate::value::DictMap,
1104 pub result: Option<serde_json::Value>,
1105 pub selected: Vec<ArtifactRecord>,
1106 pub rendered_context: String,
1107 pub rendered_verification: String,
1108 pub verification_contracts: Vec<VerificationContract>,
1109 pub tool_format: String,
1110 pub stage_session_id: String,
1111}
1112
1113pub async fn prepare_stage_node(
1114 ctx: &crate::vm::AsyncBuiltinCtx,
1115 node_id: &str,
1116 node: &WorkflowNode,
1117 task: &str,
1118 artifacts: &[ArtifactRecord],
1119) -> Result<PreparedWorkflowStageNode, VmError> {
1120 let selected_stage = super::select_workflow_stage_artifacts(
1121 ctx,
1122 artifacts,
1123 &node.context_policy,
1124 &node.input_contract,
1125 )
1126 .await?;
1127 let selected = selected_stage.artifacts;
1128 let context_policy = selected_stage.context_policy;
1129 let rendered_context_override = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1130 let assembled =
1131 crate::stdlib::assemble::assemble_from_options(ctx, &selected, assembler).await?;
1132 Some(super::render_assembled_chunks(&assembled))
1133 } else {
1134 None
1135 };
1136 let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1137 let stage_session_id = resolve_node_session_id(node);
1138 if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1139 return Err(VmError::Runtime(format!(
1140 "workflow stage {node_id} requires an existing session \
1141 (call agent_session_open and feed session_id through model_policy \
1142 before entering this stage)"
1143 )));
1144 }
1145 if let Some(min_inputs) = node.input_contract.min_inputs {
1146 if selected.len() < min_inputs {
1147 return Err(VmError::Runtime(format!(
1148 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1149 )));
1150 }
1151 }
1152 if let Some(max_inputs) = node.input_contract.max_inputs {
1153 if selected.len() > max_inputs {
1154 return Err(VmError::Runtime(format!(
1155 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1156 )));
1157 }
1158 }
1159 let prepared_prompt = super::prepare_workflow_stage_prompt(
1160 ctx,
1161 task,
1162 node.task_label.as_deref(),
1163 &selected,
1164 &context_policy,
1165 rendered_context_override.as_deref(),
1166 &verification_contracts,
1167 )
1168 .await?;
1169 let prompt = prepared_prompt.prompt;
1170 let rendered_context = prepared_prompt.rendered_context;
1171 let rendered_verification = prepared_prompt.rendered_verification;
1172
1173 let tool_names = tool_names_from_spec(&node.tools);
1174 let stage_agent_options = super::prepare_workflow_stage_agent_options(
1175 ctx,
1176 node,
1177 &stage_session_id,
1178 !tool_names.is_empty(),
1179 )
1180 .await?;
1181 let tool_format = stage_agent_options.tool_format.clone();
1182 let result = if node.kind == "verify" {
1183 if let Some(command) = node
1184 .verify
1185 .as_ref()
1186 .and_then(|verify| verify.as_object())
1187 .and_then(|verify| verify.get("command"))
1188 .and_then(|value| value.as_str())
1189 .map(str::trim)
1190 .filter(|value| !value.is_empty())
1191 {
1192 let (program, args) = if cfg!(target_os = "windows") {
1193 ("cmd", vec!["/C".to_string(), command.to_string()])
1194 } else {
1195 ("/bin/sh", vec!["-c".to_string(), command.to_string()])
1199 };
1200 let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1201 stdin_null: true,
1202 ..Default::default()
1203 };
1204 if let Some(context) = crate::stdlib::process::current_execution_context() {
1205 if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1206 crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1207 process_config.cwd = Some(std::path::PathBuf::from(cwd));
1208 }
1209 if !context.env.is_empty() {
1210 process_config.env.extend(context.env);
1211 }
1212 }
1213 let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1214 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1215 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1216 let combined = if stderr.is_empty() {
1217 stdout.clone()
1218 } else if stdout.is_empty() {
1219 stderr.clone()
1220 } else {
1221 format!("{stdout}\n{stderr}")
1222 };
1223 serde_json::json!({
1224 "status": "completed",
1225 "text": combined,
1226 "visible_text": combined,
1227 "command": command,
1228 "stdout": stdout,
1229 "stderr": stderr,
1230 "exit_status": output.status.code().unwrap_or(-1),
1231 "success": output.status.success(),
1232 })
1233 } else {
1234 serde_json::json!({
1235 "status": "completed",
1236 "text": "",
1237 "visible_text": "",
1238 })
1239 }
1240 } else {
1241 let tools_value = stage_tools_value(node);
1242 let llm_options = workflow_stage_llm_options(
1243 node,
1244 &stage_session_id,
1245 &tools_value,
1246 &tool_names,
1247 &stage_agent_options,
1248 );
1249 let agent_loop_options = if stage_agent_options.run_agent_loop {
1250 workflow_stage_agent_loop_options(
1251 ctx,
1252 node,
1253 &stage_session_id,
1254 &tools_value,
1255 &tool_names,
1256 &stage_agent_options,
1257 )
1258 .await?
1259 } else {
1260 crate::value::DictMap::new()
1261 };
1262 return Ok(PreparedWorkflowStageNode {
1263 prompt,
1264 system: node.system.clone(),
1265 run_agent_loop: stage_agent_options.run_agent_loop,
1266 llm_options,
1267 agent_loop_options,
1268 result: None,
1269 selected,
1270 rendered_context,
1271 rendered_verification,
1272 verification_contracts,
1273 tool_format,
1274 stage_session_id,
1275 });
1276 };
1277
1278 Ok(PreparedWorkflowStageNode {
1279 prompt,
1280 system: node.system.clone(),
1281 run_agent_loop: false,
1282 llm_options: crate::value::DictMap::new(),
1283 agent_loop_options: crate::value::DictMap::new(),
1284 result: Some(result),
1285 selected,
1286 rendered_context,
1287 rendered_verification,
1288 verification_contracts,
1289 tool_format,
1290 stage_session_id,
1291 })
1292}
1293
1294pub fn complete_prepared_stage_node(
1295 node_id: &str,
1296 node: &WorkflowNode,
1297 prepared: &PreparedWorkflowStageNode,
1298 mut llm_result: serde_json::Value,
1299) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1300 if let Some(payload) = llm_result.as_object_mut() {
1301 payload.insert(
1302 "prompt".to_string(),
1303 serde_json::json!(prepared.prompt.clone()),
1304 );
1305 payload.insert(
1306 "system_prompt".to_string(),
1307 serde_json::json!(node.system.clone().unwrap_or_default()),
1308 );
1309 payload.insert(
1310 "rendered_context".to_string(),
1311 serde_json::json!(prepared.rendered_context.clone()),
1312 );
1313 if !prepared.verification_contracts.is_empty() {
1314 payload.insert(
1315 "verification_contracts".to_string(),
1316 serde_json::to_value(&prepared.verification_contracts).unwrap_or_default(),
1317 );
1318 payload.insert(
1319 "rendered_verification_context".to_string(),
1320 serde_json::json!(prepared.rendered_verification.clone()),
1321 );
1322 }
1323 payload.insert(
1324 "selected_artifact_ids".to_string(),
1325 serde_json::json!(prepared
1326 .selected
1327 .iter()
1328 .map(|artifact| artifact.id.clone())
1329 .collect::<Vec<_>>()),
1330 );
1331 payload.insert(
1332 "selected_artifact_titles".to_string(),
1333 serde_json::json!(prepared
1334 .selected
1335 .iter()
1336 .map(|artifact| artifact.title.clone())
1337 .collect::<Vec<_>>()),
1338 );
1339 match payload
1340 .entry("tools".to_string())
1341 .or_insert_with(|| serde_json::json!({}))
1342 {
1343 serde_json::Value::Object(tools) => {
1344 tools.insert(
1345 "mode".to_string(),
1346 serde_json::json!(prepared.tool_format.clone()),
1347 );
1348 }
1349 slot => {
1350 *slot = serde_json::json!({ "mode": prepared.tool_format.clone() });
1351 }
1352 }
1353 }
1354
1355 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1356 let result_transcript = llm_result
1360 .get("transcript")
1361 .cloned()
1362 .map(|value| crate::stdlib::json_to_vm_value(&value));
1363 let session_transcript = crate::agent_sessions::snapshot(&prepared.stage_session_id);
1364 let transcript = result_transcript
1365 .or(session_transcript)
1366 .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1367 let output_kind = node
1368 .output_contract
1369 .output_kinds
1370 .first()
1371 .cloned()
1372 .unwrap_or_else(|| {
1373 if node.kind == "verify" {
1374 "verification_result".to_string()
1375 } else {
1376 "artifact".to_string()
1377 }
1378 });
1379 let mut metadata = BTreeMap::new();
1380 metadata.insert(
1381 "input_artifact_ids".to_string(),
1382 serde_json::json!(prepared
1383 .selected
1384 .iter()
1385 .map(|artifact| artifact.id.clone())
1386 .collect::<Vec<_>>()),
1387 );
1388 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1389 if !node.approval_policy.write_path_allowlist.is_empty() {
1390 metadata.insert(
1391 "changed_paths".to_string(),
1392 serde_json::json!(node.approval_policy.write_path_allowlist),
1393 );
1394 }
1395 let artifact = ArtifactRecord {
1396 type_name: "artifact".to_string(),
1397 id: new_id("artifact"),
1398 kind: output_kind,
1399 title: Some(format!("stage {node_id} output")),
1400 text: Some(visible_text),
1401 data: Some(llm_result.clone()),
1402 source: Some(node_id.to_string()),
1403 created_at: now_rfc3339(),
1404 freshness: Some("fresh".to_string()),
1405 priority: None,
1406 lineage: prepared
1407 .selected
1408 .iter()
1409 .map(|artifact| artifact.id.clone())
1410 .collect(),
1411 relevance: Some(1.0),
1412 estimated_tokens: None,
1413 stage: Some(node_id.to_string()),
1414 metadata,
1415 }
1416 .normalize();
1417
1418 Ok((llm_result, vec![artifact], transcript))
1419}
1420
1421pub async fn execute_stage_node(
1422 ctx: &crate::vm::AsyncBuiltinCtx,
1423 node_id: &str,
1424 node: &WorkflowNode,
1425 task: &str,
1426 artifacts: &[ArtifactRecord],
1427) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1428 let prepared = prepare_stage_node(ctx, node_id, node, task, artifacts).await?;
1429 let llm_result = if let Some(result) = prepared.result.clone() {
1430 result
1431 } else if prepared.run_agent_loop {
1432 let result = crate::stdlib::harn_entry::call_agent_loop(
1433 ctx,
1434 prepared.prompt.clone(),
1435 prepared.system.clone(),
1436 prepared.agent_loop_options.clone(),
1437 )
1438 .await?;
1439 crate::llm::vm_value_to_json(&result)
1440 } else {
1441 let args = vec![
1442 VmValue::String(arcstr::ArcStr::from(prepared.prompt.clone())),
1443 prepared
1444 .system
1445 .clone()
1446 .map(|s| VmValue::String(arcstr::ArcStr::from(s)))
1447 .unwrap_or(VmValue::Nil),
1448 VmValue::dict(prepared.llm_options.clone()),
1449 ];
1450 let opts = extract_llm_options(&args)?;
1451 let result = vm_call_llm_full(&opts).await?;
1452 crate::llm::agent_loop_result_from_llm(&result, opts)
1453 };
1454 complete_prepared_stage_node(node_id, node, &prepared, llm_result)
1455}
1456
1457pub fn append_audit_entry(
1458 graph: &mut WorkflowGraph,
1459 op: &str,
1460 node_id: Option<String>,
1461 reason: Option<String>,
1462 metadata: BTreeMap<String, serde_json::Value>,
1463) {
1464 graph.audit_log.push(WorkflowAuditEntry {
1465 id: new_id("audit"),
1466 op: op.to_string(),
1467 node_id,
1468 timestamp: now_rfc3339(),
1469 reason,
1470 metadata,
1471 });
1472}
1473
1474#[cfg(test)]
1475mod flatten_tests {
1476 use super::*;
1477 use crate::orchestration::{CapabilityPolicy, WorkflowNode};
1478 use std::collections::BTreeMap;
1479
1480 fn ceiling_with_tools(tools: &[&str]) -> CapabilityPolicy {
1481 CapabilityPolicy {
1482 tools: tools.iter().map(|t| t.to_string()).collect(),
1483 ..Default::default()
1484 }
1485 }
1486
1487 fn options_with_policy(policy: &CapabilityPolicy) -> crate::value::DictMap {
1488 let mut options = crate::value::DictMap::new();
1489 insert_json_vm_option(&mut options, "policy", policy).unwrap();
1490 options
1491 }
1492
1493 #[test]
1494 fn ceiling_pass_through_is_within() {
1495 let ceiling = ceiling_with_tools(&["read", "edit"]);
1496 assert!(ceiling.assert_within_ceiling(&ceiling).is_ok());
1498 let options = options_with_policy(&ceiling);
1499 assert!(enforce_flattened_ceiling(&options, &ceiling).is_ok());
1500 }
1501
1502 #[test]
1503 fn narrowing_is_allowed() {
1504 let ceiling = ceiling_with_tools(&["read", "edit", "run_command"]);
1505 let narrowed = ceiling_with_tools(&["read"]);
1506 assert!(ceiling.assert_within_ceiling(&narrowed).is_ok());
1507 }
1508
1509 #[test]
1510 fn widening_tools_is_rejected() {
1511 let ceiling = ceiling_with_tools(&["read"]);
1512 let widened = ceiling_with_tools(&["read", "run_command"]);
1513 let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1514 assert!(
1515 err.contains("run_command"),
1516 "error names the widened tool: {err}"
1517 );
1518
1519 let options = options_with_policy(&widened);
1521 match enforce_flattened_ceiling(&options, &ceiling) {
1522 Err(VmError::CategorizedError { message, category }) => {
1523 assert_eq!(category, crate::value::ErrorCategory::ToolRejected);
1524 assert!(message.contains("run_command"), "message: {message}");
1525 }
1526 other => panic!("expected a ToolRejected error, got {other:?}"),
1527 }
1528 }
1529
1530 #[test]
1531 fn widening_capability_op_is_rejected() {
1532 let mut ceiling = CapabilityPolicy::default();
1533 ceiling
1534 .capabilities
1535 .insert("fs".to_string(), vec!["read".to_string()]);
1536 let mut widened = CapabilityPolicy::default();
1537 widened.capabilities.insert(
1538 "fs".to_string(),
1539 vec!["read".to_string(), "write".to_string()],
1540 );
1541 let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1542 assert!(err.contains("fs") && err.contains("write"), "error: {err}");
1543 }
1544
1545 #[test]
1546 fn adding_new_capability_is_rejected() {
1547 let mut ceiling = CapabilityPolicy::default();
1548 ceiling
1549 .capabilities
1550 .insert("fs".to_string(), vec!["read".to_string()]);
1551 let mut widened = ceiling.clone();
1552 widened
1553 .capabilities
1554 .insert("net".to_string(), vec!["connect".to_string()]);
1555 let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1556 assert!(
1557 err.contains("net"),
1558 "error names the added capability: {err}"
1559 );
1560 }
1561
1562 #[test]
1563 fn widening_recursion_budget_is_rejected() {
1564 let ceiling = CapabilityPolicy {
1565 recursion_limit: Some(2),
1566 ..Default::default()
1567 };
1568 let widened = CapabilityPolicy {
1569 recursion_limit: Some(9),
1570 ..Default::default()
1571 };
1572 assert!(ceiling.assert_within_ceiling(&widened).is_err());
1573 let dropped = CapabilityPolicy::default();
1575 assert!(ceiling.assert_within_ceiling(&dropped).is_err());
1576 let narrowed = CapabilityPolicy {
1578 recursion_limit: Some(1),
1579 ..Default::default()
1580 };
1581 assert!(ceiling.assert_within_ceiling(&narrowed).is_ok());
1582 }
1583
1584 #[test]
1585 fn widening_roots_is_rejected() {
1586 let ceiling = CapabilityPolicy {
1587 workspace_roots: vec!["/repo".to_string()],
1588 ..Default::default()
1589 };
1590 let widened = CapabilityPolicy {
1591 workspace_roots: vec!["/repo".to_string(), "/etc".to_string()],
1592 ..Default::default()
1593 };
1594 let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1595 assert!(err.contains("/etc"), "error: {err}");
1596 }
1597
1598 #[test]
1599 fn widening_side_effect_level_is_rejected() {
1600 let ceiling = CapabilityPolicy {
1601 side_effect_level: Some("read_only".to_string()),
1602 ..Default::default()
1603 };
1604 let widened = CapabilityPolicy {
1605 side_effect_level: Some("network".to_string()),
1606 ..Default::default()
1607 };
1608 assert!(ceiling.assert_within_ceiling(&widened).is_err());
1609 }
1610
1611 #[test]
1612 fn unknown_side_effect_level_ranks_fail_closed() {
1613 let ceiling = CapabilityPolicy {
1617 side_effect_level: Some("none".to_string()),
1618 ..Default::default()
1619 };
1620 let widened = CapabilityPolicy {
1621 side_effect_level: Some("desktop_control".to_string()),
1622 ..Default::default()
1623 };
1624 assert!(ceiling.assert_within_ceiling(&widened).is_err());
1625 let unknown = CapabilityPolicy {
1628 side_effect_level: Some("teleport".to_string()),
1629 ..Default::default()
1630 };
1631 assert!(ceiling.assert_within_ceiling(&unknown).is_ok());
1632 }
1633
1634 #[test]
1635 fn widening_process_sandbox_roots_is_rejected() {
1636 use crate::orchestration::ProcessSandboxPolicy;
1637 let ceiling = CapabilityPolicy {
1638 process_sandbox: ProcessSandboxPolicy {
1639 write_roots: vec!["/repo/.cache".to_string()],
1640 ..Default::default()
1641 },
1642 ..Default::default()
1643 };
1644 let widened = CapabilityPolicy {
1645 process_sandbox: ProcessSandboxPolicy {
1646 write_roots: vec!["/repo/.cache".to_string(), "/etc".to_string()],
1647 ..Default::default()
1648 },
1649 ..Default::default()
1650 };
1651 let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1652 assert!(
1653 err.contains("process_sandbox.write_roots") && err.contains("/etc"),
1654 "error: {err}"
1655 );
1656 assert!(ceiling
1658 .assert_within_ceiling(&CapabilityPolicy::default())
1659 .is_ok());
1660 }
1661
1662 #[test]
1663 fn injecting_process_sandbox_roots_into_empty_ceiling_is_rejected() {
1664 use crate::orchestration::ProcessSandboxPolicy;
1665 let ceiling = CapabilityPolicy::default();
1670 for (field, requested) in [
1671 (
1672 "process_sandbox.read_roots",
1673 CapabilityPolicy {
1674 process_sandbox: ProcessSandboxPolicy {
1675 read_roots: vec!["/etc".to_string()],
1676 ..Default::default()
1677 },
1678 ..Default::default()
1679 },
1680 ),
1681 (
1682 "process_sandbox.write_roots",
1683 CapabilityPolicy {
1684 process_sandbox: ProcessSandboxPolicy {
1685 write_roots: vec!["/etc".to_string()],
1686 ..Default::default()
1687 },
1688 ..Default::default()
1689 },
1690 ),
1691 ] {
1692 let err = ceiling.assert_within_ceiling(&requested).unwrap_err();
1693 assert!(
1694 err.contains(field) && err.contains("/etc"),
1695 "empty ceiling must reject injected {field}: {err}"
1696 );
1697 }
1698 assert!(ceiling
1700 .assert_within_ceiling(&CapabilityPolicy::default())
1701 .is_ok());
1702 }
1703
1704 #[test]
1705 fn widening_process_sandbox_presets_is_rejected() {
1706 use crate::orchestration::{ProcessSandboxPolicy, ProcessSandboxPreset};
1707 let ceiling = CapabilityPolicy {
1708 process_sandbox: ProcessSandboxPolicy {
1709 presets: Some(vec![ProcessSandboxPreset::SystemRuntime]),
1710 ..Default::default()
1711 },
1712 ..Default::default()
1713 };
1714 let widened = CapabilityPolicy {
1715 process_sandbox: ProcessSandboxPolicy {
1716 presets: Some(vec![
1717 ProcessSandboxPreset::SystemRuntime,
1718 ProcessSandboxPreset::DeveloperToolchains,
1719 ]),
1720 ..Default::default()
1721 },
1722 ..Default::default()
1723 };
1724 let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1725 assert!(err.contains("process_sandbox presets"), "error: {err}");
1726 }
1727
1728 #[test]
1729 fn dropping_tool_arg_constraint_is_rejected() {
1730 use crate::orchestration::ToolArgConstraint;
1731 let constraint = ToolArgConstraint {
1732 tool: "edit".to_string(),
1733 arg_patterns: vec!["src/**".to_string()],
1734 arg_key: Some("path".to_string()),
1735 };
1736 let ceiling = CapabilityPolicy {
1737 tool_arg_constraints: vec![constraint],
1738 ..Default::default()
1739 };
1740 let widened = CapabilityPolicy::default();
1742 let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1743 assert!(
1744 err.contains("tool_arg_constraints") && err.contains("edit"),
1745 "error: {err}"
1746 );
1747 let mut narrowed = ceiling.clone();
1749 narrowed.tool_arg_constraints.push(ToolArgConstraint {
1750 tool: "run_command".to_string(),
1751 arg_patterns: vec!["cargo *".to_string()],
1752 arg_key: None,
1753 });
1754 assert!(ceiling.assert_within_ceiling(&narrowed).is_ok());
1755 }
1756
1757 #[test]
1758 fn weakening_tool_annotation_is_rejected() {
1759 use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema};
1760 let strong = ToolAnnotations {
1761 side_effect_level: SideEffectLevel::ReadOnly,
1762 arg_schema: ToolArgSchema {
1763 path_params: vec!["path".to_string()],
1764 ..Default::default()
1765 },
1766 ..Default::default()
1767 };
1768 let mut ceiling = CapabilityPolicy {
1769 tools: vec!["edit".to_string(), "read".to_string()],
1770 ..Default::default()
1771 };
1772 ceiling
1773 .tool_annotations
1774 .insert("edit".to_string(), strong.clone());
1775
1776 let mut dropped = ceiling.clone();
1779 dropped.tool_annotations.clear();
1780 let err = ceiling.assert_within_ceiling(&dropped).unwrap_err();
1781 assert!(
1782 err.contains("tool_annotations") && err.contains("edit"),
1783 "error: {err}"
1784 );
1785
1786 let mut rewritten = ceiling.clone();
1788 rewritten.tool_annotations.insert(
1789 "edit".to_string(),
1790 ToolAnnotations {
1791 side_effect_level: SideEffectLevel::None,
1792 ..strong
1793 },
1794 );
1795 assert!(ceiling.assert_within_ceiling(&rewritten).is_err());
1796
1797 let narrowed_tools = CapabilityPolicy {
1800 tools: vec!["read".to_string()],
1801 ..Default::default()
1802 };
1803 assert!(ceiling.assert_within_ceiling(&narrowed_tools).is_ok());
1804 }
1805
1806 fn legacy_flatten_reference(
1811 node: &WorkflowNode,
1812 session_id: &str,
1813 tool_format: &str,
1814 mut options: crate::value::DictMap,
1815 tools_value: &Option<VmValue>,
1816 tool_names: &[String],
1817 ) -> crate::value::DictMap {
1818 if let Some(raw) = node.raw_model_policy.as_ref().and_then(|v| v.as_dict()) {
1819 for (key, value) in raw {
1820 if !matches!(value, VmValue::Nil) {
1821 options.insert(key.clone(), value.clone());
1822 }
1823 }
1824 }
1825 if !options.contains_key("command_policy") {
1826 if let Some(command_policy) = node
1827 .raw_model_policy
1828 .as_ref()
1829 .and_then(|v| v.as_dict())
1830 .and_then(|d| d.get("policy"))
1831 .and_then(|v| v.as_dict())
1832 .and_then(|p| p.get("command_policy"))
1833 {
1834 options.insert(
1835 crate::value::intern_key("command_policy"),
1836 command_policy.clone(),
1837 );
1838 }
1839 }
1840 if !node.auto_compact.enabled {
1841 options.insert(
1842 crate::value::intern_key("auto_compact"),
1843 VmValue::Bool(false),
1844 );
1845 } else {
1846 options.insert(
1847 crate::value::intern_key("auto_compact"),
1848 VmValue::Bool(true),
1849 );
1850 if let Some(v) = node.auto_compact.token_threshold {
1851 options.insert(
1852 crate::value::intern_key("compact_threshold"),
1853 VmValue::Int(v as i64),
1854 );
1855 }
1856 if let Some(v) = node.auto_compact.tool_output_max_chars {
1857 options.insert(
1858 crate::value::intern_key("tool_output_max_chars"),
1859 VmValue::Int(v as i64),
1860 );
1861 }
1862 if let Some(v) = node.auto_compact.hard_limit_tokens {
1863 options.insert(
1864 crate::value::intern_key("hard_limit_tokens"),
1865 VmValue::Int(v as i64),
1866 );
1867 }
1868 if let Some(s) = node.auto_compact.compact_strategy.as_ref() {
1869 options.put_str("compact_strategy", s.clone());
1870 }
1871 if let Some(s) = node.auto_compact.hard_limit_strategy.as_ref() {
1872 options.put_str("hard_limit_strategy", s.clone());
1873 }
1874 let raw = node.raw_auto_compact.as_ref().and_then(|v| v.as_dict());
1875 let keep = raw
1876 .and_then(|d| d.get("compact_keep_last"))
1877 .and_then(|v| v.as_int())
1878 .filter(|v| *v >= 0)
1879 .or_else(|| {
1880 raw.and_then(|d| d.get("keep_last"))
1881 .and_then(|v| v.as_int())
1882 .filter(|v| *v >= 0)
1883 });
1884 if let Some(v) = keep {
1885 options.insert(
1886 crate::value::intern_key("compact_keep_last"),
1887 VmValue::Int(v),
1888 );
1889 }
1890 if let Some(p) = raw
1891 .and_then(|d| d.get("summarize_prompt"))
1892 .and_then(|v| match v {
1893 VmValue::String(t) if !t.trim().is_empty() => Some(t.to_string()),
1894 _ => None,
1895 })
1896 {
1897 options.put_str("summarize_prompt", p);
1898 }
1899 if let Some(d) = raw {
1900 for key in ["compress_callback", "mask_callback"] {
1901 if let Some(cb) = d.get(key) {
1902 options.insert(crate::value::intern_key(key), cb.clone());
1903 }
1904 }
1905 if let Some(cb) = d.get("custom_compactor") {
1906 options.insert(crate::value::intern_key("compact_callback"), cb.clone());
1907 }
1908 }
1909 }
1910 if !tool_names.is_empty() {
1911 if let Some(v) = tools_value.clone() {
1912 options.insert(crate::value::intern_key("tools"), v);
1913 }
1914 }
1915 let tool_policy = tool_capability_policy_from_spec(&node.tools);
1916 let effective = tool_policy.intersect(&node.capability_policy).unwrap();
1917 insert_json_vm_option(&mut options, "policy", &effective).unwrap();
1918 insert_json_vm_option(&mut options, "approval_policy", &node.approval_policy).unwrap();
1919 options.put_str("session_id", session_id);
1920 options.put_str("tool_format", tool_format);
1921 let label = node.id.clone().unwrap_or_else(|| session_id.to_string());
1922 crate::orchestration::annotate_nested_execution_options(
1923 &mut options,
1924 crate::orchestration::NestedExecutionKind::WorkflowStage,
1925 &label,
1926 );
1927 options
1928 }
1929
1930 fn representative_node() -> WorkflowNode {
1931 let mut raw_model_policy = BTreeMap::new();
1932 raw_model_policy.insert(
1933 "provider".to_string(),
1934 VmValue::String(arcstr::ArcStr::from("anthropic")),
1935 );
1936 raw_model_policy.insert("temperature".to_string(), VmValue::Float(0.2));
1937 let mut nested_policy = BTreeMap::new();
1939 nested_policy.insert(
1940 "command_policy".to_string(),
1941 VmValue::String(arcstr::ArcStr::from("worktree")),
1942 );
1943 raw_model_policy.insert("policy".to_string(), VmValue::dict(nested_policy));
1944 raw_model_policy.insert("nudge".to_string(), VmValue::Nil);
1946
1947 let mut raw_auto_compact = BTreeMap::new();
1948 raw_auto_compact.insert("keep_last".to_string(), VmValue::Int(4));
1949 raw_auto_compact.insert(
1950 "summarize_prompt".to_string(),
1951 VmValue::String(arcstr::ArcStr::from("summarize tersely")),
1952 );
1953
1954 WorkflowNode {
1955 id: Some("act".to_string()),
1956 kind: "stage".to_string(),
1957 mode: Some("agent".to_string()),
1958 tools: serde_json::json!(["read", "edit"]),
1959 auto_compact: crate::orchestration::AutoCompactPolicy {
1960 enabled: true,
1961 token_threshold: Some(8000),
1962 tool_output_max_chars: Some(2000),
1963 hard_limit_tokens: Some(20000),
1964 compact_strategy: Some("summary".to_string()),
1965 hard_limit_strategy: Some("truncate".to_string()),
1966 },
1967 capability_policy: CapabilityPolicy {
1968 tools: vec!["read".to_string(), "edit".to_string()],
1969 recursion_limit: Some(3),
1970 ..Default::default()
1971 },
1972 raw_model_policy: Some(VmValue::dict(raw_model_policy)),
1973 raw_auto_compact: Some(VmValue::dict(raw_auto_compact)),
1974 ..Default::default()
1975 }
1976 }
1977
1978 #[tokio::test(flavor = "current_thread", start_paused = true)]
1979 async fn flatten_matches_pre_move_rust() {
1980 crate::reset_thread_local_state();
1981 let node = representative_node();
1982 let session_id = "session-parity";
1983 let tool_format = "text";
1984 let tool_names = vec!["read".to_string(), "edit".to_string()];
1985 let tools_value = Some(crate::stdlib::json_to_vm_value(&node.tools));
1986
1987 let mut base = crate::value::DictMap::new();
1989 base.insert(
1990 crate::value::intern_key("loop_until_done"),
1991 VmValue::Bool(true),
1992 );
1993 base.insert(crate::value::intern_key("max_iterations"), VmValue::Int(16));
1994
1995 let stage_agent_options = super::super::WorkflowStageAgentOptions {
1996 run_agent_loop: true,
1997 tool_format: tool_format.to_string(),
1998 llm_options: BTreeMap::new(),
1999 agent_loop_options: base
2000 .iter()
2001 .map(|(k, v)| (k.to_string(), vm_value_to_json(v)))
2002 .collect(),
2003 };
2004
2005 let mut vm = crate::Vm::new();
2006 crate::register_vm_stdlib(&mut vm);
2007 let ctx = crate::vm::AsyncBuiltinCtx::for_test(vm);
2008
2009 let flattened = workflow_stage_agent_loop_options(
2010 &ctx,
2011 &node,
2012 session_id,
2013 &tools_value,
2014 &tool_names,
2015 &stage_agent_options,
2016 )
2017 .await
2018 .expect("harn flatten succeeds");
2019
2020 let expected = legacy_flatten_reference(
2021 &node,
2022 session_id,
2023 tool_format,
2024 base,
2025 &tools_value,
2026 &tool_names,
2027 );
2028
2029 let flattened_json = vm_value_to_json(&VmValue::dict(flattened));
2030 let expected_json = vm_value_to_json(&VmValue::dict(expected));
2031 assert_eq!(
2032 flattened_json, expected_json,
2033 "Harn flatten must be dict-equal to the pre-move Rust flatten"
2034 );
2035 }
2036}