Skip to main content

harn_vm/orchestration/
mod.rs

1use std::path::PathBuf;
2use std::{cell::RefCell, thread_local};
3
4use serde::{Deserialize, Serialize};
5
6use crate::llm::vm_value_to_json;
7use crate::value::{VmError, VmValue};
8
9pub(crate) fn now_rfc3339() -> String {
10    use std::time::{SystemTime, UNIX_EPOCH};
11    let ts = SystemTime::now()
12        .duration_since(UNIX_EPOCH)
13        .unwrap_or_default()
14        .as_secs();
15    format!("{ts}")
16}
17
18pub(crate) fn new_id(prefix: &str) -> String {
19    format!("{prefix}_{}", uuid::Uuid::now_v7())
20}
21
22pub(crate) fn default_run_dir() -> PathBuf {
23    let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
24    crate::runtime_paths::run_root(&base)
25}
26
27mod hooks;
28pub use hooks::*;
29
30mod pipeline_lifecycle;
31pub use pipeline_lifecycle::*;
32
33mod command_policy;
34pub use command_policy::*;
35
36mod compaction;
37pub use compaction::*;
38
39pub mod agent_inbox;
40
41mod artifacts;
42pub use artifacts::*;
43
44mod assemble;
45pub use assemble::*;
46
47mod handoffs;
48pub use handoffs::*;
49
50mod friction;
51pub use friction::*;
52
53mod crystallize;
54pub use crystallize::*;
55
56mod release_fixture;
57pub use release_fixture::*;
58
59mod replay_oracle;
60pub use replay_oracle::*;
61
62mod replay_bench;
63pub use replay_bench::*;
64
65mod policy;
66pub use policy::*;
67
68mod stage_options;
69pub use stage_options::*;
70
71mod workflow;
72pub use workflow::*;
73
74mod workflow_bundle;
75pub use workflow_bundle::*;
76
77mod workflow_patch;
78pub use workflow_patch::*;
79
80mod safe_function_tools;
81pub use safe_function_tools::*;
82
83mod nested_invocation;
84pub use nested_invocation::*;
85
86#[cfg(test)]
87mod workflow_test_fixtures;
88
89mod records;
90pub use records::*;
91
92mod merge_captain_audit;
93pub use merge_captain_audit::*;
94
95mod merge_captain_driver;
96pub use merge_captain_driver::*;
97
98mod merge_captain_ladder;
99pub use merge_captain_ladder::*;
100
101mod merge_captain_iteration;
102pub use merge_captain_iteration::*;
103
104pub mod playground;
105
106thread_local! {
107    static CURRENT_MUTATION_SESSION: RefCell<Option<MutationSessionRecord>> = const { RefCell::new(None) };
108    /// Workflow-level skill context, installed by `workflow_execute` so
109    /// every per-node agent loop constructed inside `execute_stage_node`
110    /// can pick up the same `skills:` / `skill_match:` registry without
111    /// threading a new parameter through every helper. Cleared on
112    /// workflow exit (success or error) by `WorkflowSkillContextGuard`.
113    static CURRENT_WORKFLOW_SKILL_CONTEXT: RefCell<Option<WorkflowSkillContext>> = const { RefCell::new(None) };
114}
115
116/// Skill wiring threaded from `workflow_execute` into the per-stage
117/// agent loops via thread-local context. `VmValue` wraps `Rc` and is
118/// not `Send`, so we store it in a thread-local rather than a mutex —
119/// the workflow runner pins itself to one task via `LocalSet`, so
120/// every stage observes the same context.
121#[derive(Clone, Default)]
122pub struct WorkflowSkillContext {
123    pub registry: Option<VmValue>,
124    pub match_config: Option<VmValue>,
125}
126
127pub fn install_workflow_skill_context(context: Option<WorkflowSkillContext>) {
128    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| {
129        *slot.borrow_mut() = context;
130    });
131}
132
133pub fn current_workflow_skill_context() -> Option<WorkflowSkillContext> {
134    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| slot.borrow().clone())
135}
136
137/// RAII guard that clears the workflow skill context on drop. Paired
138/// with `install_workflow_skill_context` at the top of `execute_workflow`
139/// so the context never leaks past a workflow's scope.
140pub struct WorkflowSkillContextGuard;
141
142impl Drop for WorkflowSkillContextGuard {
143    fn drop(&mut self) {
144        install_workflow_skill_context(None);
145    }
146}
147
148#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
149#[serde(default)]
150pub struct MutationSessionRecord {
151    pub session_id: String,
152    pub parent_session_id: Option<String>,
153    pub run_id: Option<String>,
154    pub worker_id: Option<String>,
155    pub execution_kind: Option<String>,
156    pub mutation_scope: String,
157    /// Declarative per-tool approval policy for this session. When `None`,
158    /// no policy-driven approval is requested; the session update stream
159    /// remains the only host-observable surface for tool dispatch.
160    pub approval_policy: Option<ToolApprovalPolicy>,
161}
162
163impl MutationSessionRecord {
164    pub fn normalize(mut self) -> Self {
165        if self.session_id.is_empty() {
166            self.session_id = new_id("session");
167        }
168        if self.mutation_scope.is_empty() {
169            self.mutation_scope = "read_only".to_string();
170        }
171        self
172    }
173}
174
175pub fn install_current_mutation_session(session: Option<MutationSessionRecord>) {
176    CURRENT_MUTATION_SESSION.with(|slot| {
177        *slot.borrow_mut() = session.map(MutationSessionRecord::normalize);
178    });
179}
180
181pub fn current_mutation_session() -> Option<MutationSessionRecord> {
182    CURRENT_MUTATION_SESSION.with(|slot| slot.borrow().clone())
183}
184pub(crate) fn parse_json_payload<T: for<'de> Deserialize<'de>>(
185    json: serde_json::Value,
186    label: &str,
187) -> Result<T, VmError> {
188    let payload = json.to_string();
189    let mut deserializer = serde_json::Deserializer::from_str(&payload);
190    let mut tracker = serde_path_to_error::Track::new();
191    let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
192    T::deserialize(path_deserializer).map_err(|error| {
193        let snippet = if payload.len() > 600 {
194            format!("{}...", &payload[..600])
195        } else {
196            payload.clone()
197        };
198        VmError::Runtime(format!(
199            "{label} parse error at {}: {} | payload={}",
200            tracker.path(),
201            error,
202            snippet
203        ))
204    })
205}
206
207pub(crate) fn parse_json_value<T: for<'de> Deserialize<'de>>(
208    value: &VmValue,
209) -> Result<T, VmError> {
210    parse_json_payload(vm_value_to_json(value), "orchestration")
211}
212
213#[cfg(test)]
214mod tests;