Skip to main content

harn_vm/orchestration/
mod.rs

1use std::path::PathBuf;
2use std::{cell::RefCell, thread_local};
3
4use serde::{Deserialize, Serialize};
5
6use crate::llm::vm_value_to_json;
7use crate::value::{VmError, VmValue};
8
9pub(crate) fn now_rfc3339() -> String {
10    use std::time::{SystemTime, UNIX_EPOCH};
11    let ts = SystemTime::now()
12        .duration_since(UNIX_EPOCH)
13        .unwrap_or_default()
14        .as_secs();
15    format!("{ts}")
16}
17
18pub(crate) fn new_id(prefix: &str) -> String {
19    format!("{prefix}_{}", uuid::Uuid::now_v7())
20}
21
22pub(crate) fn default_run_dir() -> PathBuf {
23    let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
24    crate::runtime_paths::run_root(&base)
25}
26
27mod hooks;
28pub use hooks::*;
29
30mod pipeline_lifecycle;
31pub use pipeline_lifecycle::*;
32
33mod settlement_agent;
34pub use settlement_agent::*;
35
36mod lifecycle_receipts;
37pub use lifecycle_receipts::*;
38
39mod command_policy;
40pub use command_policy::*;
41
42mod compaction;
43pub use compaction::*;
44
45mod compact_lifecycle;
46pub use compact_lifecycle::*;
47
48mod compaction_policy_registry;
49pub use compaction_policy_registry::*;
50
51pub mod agent_inbox;
52
53mod artifacts;
54pub use artifacts::*;
55
56mod assemble;
57pub use assemble::*;
58
59mod handoffs;
60pub use handoffs::*;
61
62mod friction;
63pub use friction::*;
64
65mod crystallize;
66pub use crystallize::*;
67
68mod release_fixture;
69pub use release_fixture::*;
70
71mod replay_oracle;
72pub use replay_oracle::*;
73
74mod replay_bench;
75pub use replay_bench::*;
76
77mod policy;
78pub use policy::*;
79
80mod ambient_scope;
81pub(crate) use ambient_scope::{scope_ambient, AmbientExecutionScope};
82
83mod stage_options;
84pub use stage_options::*;
85
86mod workflow;
87pub use workflow::*;
88
89mod workflow_bundle;
90pub use workflow_bundle::*;
91
92mod workflow_patch;
93pub use workflow_patch::*;
94
95mod safe_function_tools;
96pub use safe_function_tools::*;
97
98mod nested_invocation;
99pub use nested_invocation::*;
100
101#[cfg(test)]
102mod workflow_test_fixtures;
103
104mod records;
105pub use records::*;
106
107mod context_eval;
108pub use context_eval::*;
109
110mod skill_gate;
111pub use skill_gate::*;
112
113mod merge_captain_audit;
114pub use merge_captain_audit::*;
115
116mod merge_captain_driver;
117pub use merge_captain_driver::*;
118
119mod merge_captain_ladder;
120pub use merge_captain_ladder::*;
121
122mod merge_captain_iteration;
123pub use merge_captain_iteration::*;
124
125pub mod playground;
126
127thread_local! {
128    static CURRENT_MUTATION_SESSION: RefCell<Option<MutationSessionRecord>> = const { RefCell::new(None) };
129    /// Workflow-level skill context, installed by `workflow_execute` so
130    /// every per-node agent loop constructed inside `execute_stage_node`
131    /// can pick up the same `skills:` / `skill_match:` registry without
132    /// threading a new parameter through every helper. Cleared on
133    /// workflow exit (success or error) by `WorkflowSkillContextGuard`.
134    static CURRENT_WORKFLOW_SKILL_CONTEXT: RefCell<Option<WorkflowSkillContext>> = const { RefCell::new(None) };
135}
136
137/// Skill wiring threaded from `workflow_execute` into the per-stage
138/// agent loops via thread-local context. The workflow runner pins itself
139/// to one task via `LocalSet`, so every stage observes the same context
140/// without cross-task synchronization.
141#[derive(Clone, Default)]
142pub struct WorkflowSkillContext {
143    pub registry: Option<VmValue>,
144    pub match_config: Option<VmValue>,
145}
146
147pub fn install_workflow_skill_context(context: Option<WorkflowSkillContext>) {
148    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| {
149        *slot.borrow_mut() = context;
150    });
151}
152
153pub fn current_workflow_skill_context() -> Option<WorkflowSkillContext> {
154    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| slot.borrow().clone())
155}
156
157/// RAII guard that clears the workflow skill context on drop. Paired
158/// with `install_workflow_skill_context` at the top of `execute_workflow`
159/// so the context never leaks past a workflow's scope.
160pub struct WorkflowSkillContextGuard;
161
162impl Drop for WorkflowSkillContextGuard {
163    fn drop(&mut self) {
164        install_workflow_skill_context(None);
165    }
166}
167
168#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
169#[serde(default)]
170pub struct MutationSessionRecord {
171    pub session_id: String,
172    pub parent_session_id: Option<String>,
173    pub run_id: Option<String>,
174    pub worker_id: Option<String>,
175    pub execution_kind: Option<String>,
176    pub mutation_scope: String,
177    /// Declarative per-tool approval policy for this session. When `None`,
178    /// no policy-driven approval is requested; the session update stream
179    /// remains the only host-observable surface for tool dispatch.
180    pub approval_policy: Option<ToolApprovalPolicy>,
181}
182
183impl MutationSessionRecord {
184    pub fn normalize(mut self) -> Self {
185        if self.session_id.is_empty() {
186            self.session_id = new_id("session");
187        }
188        if self.mutation_scope.is_empty() {
189            self.mutation_scope = "read_only".to_string();
190        }
191        self
192    }
193}
194
195pub fn install_current_mutation_session(session: Option<MutationSessionRecord>) {
196    CURRENT_MUTATION_SESSION.with(|slot| {
197        *slot.borrow_mut() = session.map(MutationSessionRecord::normalize);
198    });
199}
200
201pub fn current_mutation_session() -> Option<MutationSessionRecord> {
202    CURRENT_MUTATION_SESSION.with(|slot| slot.borrow().clone())
203}
204pub(crate) fn parse_json_payload<T: for<'de> Deserialize<'de>>(
205    json: serde_json::Value,
206    label: &str,
207) -> Result<T, VmError> {
208    let payload = json.to_string();
209    let mut deserializer = serde_json::Deserializer::from_str(&payload);
210    let mut tracker = serde_path_to_error::Track::new();
211    let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
212    T::deserialize(path_deserializer).map_err(|error| {
213        let snippet = if payload.len() > 600 {
214            format!("{}...", &payload[..600])
215        } else {
216            payload.clone()
217        };
218        VmError::Runtime(format!(
219            "{label} parse error at {}: {} | payload={}",
220            tracker.path(),
221            error,
222            snippet
223        ))
224    })
225}
226
227pub(crate) fn parse_json_value<T: for<'de> Deserialize<'de>>(
228    value: &VmValue,
229) -> Result<T, VmError> {
230    parse_json_payload(vm_value_to_json(value), "orchestration")
231}
232
233#[cfg(test)]
234mod tests;