Skip to main content

harn_vm/orchestration/
mod.rs

1use std::path::PathBuf;
2use std::{cell::RefCell, thread_local};
3
4use serde::{Deserialize, Serialize};
5
6use crate::llm::vm_value_to_json;
7use crate::value::{VmError, VmValue};
8
9pub(crate) fn now_rfc3339() -> String {
10    use std::time::{SystemTime, UNIX_EPOCH};
11    let ts = SystemTime::now()
12        .duration_since(UNIX_EPOCH)
13        .unwrap_or_default()
14        .as_secs();
15    format!("{ts}")
16}
17
18pub(crate) fn new_id(prefix: &str) -> String {
19    format!("{prefix}_{}", uuid::Uuid::now_v7())
20}
21
22pub(crate) fn default_run_dir() -> PathBuf {
23    let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
24    crate::runtime_paths::run_root(&base)
25}
26
27mod hooks;
28pub use hooks::*;
29
30mod command_policy;
31pub use command_policy::*;
32
33mod compaction;
34pub use compaction::*;
35
36mod artifacts;
37pub use artifacts::*;
38
39mod assemble;
40pub use assemble::*;
41
42mod handoffs;
43pub use handoffs::*;
44
45mod friction;
46pub use friction::*;
47
48mod crystallize;
49pub use crystallize::*;
50
51mod release_fixture;
52pub use release_fixture::*;
53
54mod replay_oracle;
55pub use replay_oracle::*;
56
57mod replay_bench;
58pub use replay_bench::*;
59
60mod policy;
61pub use policy::*;
62
63mod stage_options;
64pub use stage_options::*;
65
66mod workflow;
67pub use workflow::*;
68
69mod workflow_bundle;
70pub use workflow_bundle::*;
71
72mod workflow_patch;
73pub use workflow_patch::*;
74
75mod safe_function_tools;
76pub use safe_function_tools::*;
77
78mod nested_invocation;
79pub use nested_invocation::*;
80
81#[cfg(test)]
82mod workflow_test_fixtures;
83
84mod records;
85pub use records::*;
86
87mod merge_captain_audit;
88pub use merge_captain_audit::*;
89
90mod merge_captain_driver;
91pub use merge_captain_driver::*;
92
93mod merge_captain_ladder;
94pub use merge_captain_ladder::*;
95
96mod merge_captain_iteration;
97pub use merge_captain_iteration::*;
98
99pub mod playground;
100
101thread_local! {
102    static CURRENT_MUTATION_SESSION: RefCell<Option<MutationSessionRecord>> = const { RefCell::new(None) };
103    /// Workflow-level skill context, installed by `workflow_execute` so
104    /// every per-node agent loop constructed inside `execute_stage_node`
105    /// can pick up the same `skills:` / `skill_match:` registry without
106    /// threading a new parameter through every helper. Cleared on
107    /// workflow exit (success or error) by `WorkflowSkillContextGuard`.
108    static CURRENT_WORKFLOW_SKILL_CONTEXT: RefCell<Option<WorkflowSkillContext>> = const { RefCell::new(None) };
109}
110
111/// Skill wiring threaded from `workflow_execute` into the per-stage
112/// agent loops via thread-local context. `VmValue` wraps `Rc` and is
113/// not `Send`, so we store it in a thread-local rather than a mutex —
114/// the workflow runner pins itself to one task via `LocalSet`, so
115/// every stage observes the same context.
116#[derive(Clone, Default)]
117pub struct WorkflowSkillContext {
118    pub registry: Option<VmValue>,
119    pub match_config: Option<VmValue>,
120}
121
122pub fn install_workflow_skill_context(context: Option<WorkflowSkillContext>) {
123    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| {
124        *slot.borrow_mut() = context;
125    });
126}
127
128pub fn current_workflow_skill_context() -> Option<WorkflowSkillContext> {
129    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| slot.borrow().clone())
130}
131
132/// RAII guard that clears the workflow skill context on drop. Paired
133/// with `install_workflow_skill_context` at the top of `execute_workflow`
134/// so the context never leaks past a workflow's scope.
135pub struct WorkflowSkillContextGuard;
136
137impl Drop for WorkflowSkillContextGuard {
138    fn drop(&mut self) {
139        install_workflow_skill_context(None);
140    }
141}
142
143#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
144#[serde(default)]
145pub struct MutationSessionRecord {
146    pub session_id: String,
147    pub parent_session_id: Option<String>,
148    pub run_id: Option<String>,
149    pub worker_id: Option<String>,
150    pub execution_kind: Option<String>,
151    pub mutation_scope: String,
152    /// Declarative per-tool approval policy for this session. When `None`,
153    /// no policy-driven approval is requested; the session update stream
154    /// remains the only host-observable surface for tool dispatch.
155    pub approval_policy: Option<ToolApprovalPolicy>,
156}
157
158impl MutationSessionRecord {
159    pub fn normalize(mut self) -> Self {
160        if self.session_id.is_empty() {
161            self.session_id = new_id("session");
162        }
163        if self.mutation_scope.is_empty() {
164            self.mutation_scope = "read_only".to_string();
165        }
166        self
167    }
168}
169
170pub fn install_current_mutation_session(session: Option<MutationSessionRecord>) {
171    CURRENT_MUTATION_SESSION.with(|slot| {
172        *slot.borrow_mut() = session.map(MutationSessionRecord::normalize);
173    });
174}
175
176pub fn current_mutation_session() -> Option<MutationSessionRecord> {
177    CURRENT_MUTATION_SESSION.with(|slot| slot.borrow().clone())
178}
179pub(crate) fn parse_json_payload<T: for<'de> Deserialize<'de>>(
180    json: serde_json::Value,
181    label: &str,
182) -> Result<T, VmError> {
183    let payload = json.to_string();
184    let mut deserializer = serde_json::Deserializer::from_str(&payload);
185    let mut tracker = serde_path_to_error::Track::new();
186    let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
187    T::deserialize(path_deserializer).map_err(|error| {
188        let snippet = if payload.len() > 600 {
189            format!("{}...", &payload[..600])
190        } else {
191            payload.clone()
192        };
193        VmError::Runtime(format!(
194            "{label} parse error at {}: {} | payload={}",
195            tracker.path(),
196            error,
197            snippet
198        ))
199    })
200}
201
202pub(crate) fn parse_json_value<T: for<'de> Deserialize<'de>>(
203    value: &VmValue,
204) -> Result<T, VmError> {
205    parse_json_payload(vm_value_to_json(value), "orchestration")
206}
207
208#[cfg(test)]
209mod tests;